https://bui.app/the-missing-manual-for-signals-state-management-for-python-developers/ bui.app * Home * About * Project: reaktiv Jun 13, 2025 14 min read The Missing Manual for Signals: State Management for Python Developers A practical guide to reactive state management in Python Introduction I maintain reaktiv. When I demo it to Python teams, I get the same response: "Why do I need this? I can just call functions when things change." Fair question. Python has excellent patterns for coordinating state changes. You can trigger updates manually, use the observer pattern, or set up event systems. Most Python applications handle state coordination just fine. But some don't. If you're building systems where state changes cascade through multiple components, where derived values need to stay synchronized, or where manual coordination is becoming a maintenance burden - signals might solve real problems for you. Frontend developers recognize the pattern immediately. They've dealt with forgetting to trigger updates when state changes, or having component state get out of sync. Signals solve the "forgot to update X when Y changed" class of bugs. This manual shows you when that coordination problem is worth solving with reactive programming, and when it's not. What You'll Learn * When reactive state management solves real problems (and when it doesn't) * How to adopt signals incrementally in existing systems * Patterns that work in production Python applications Let's start with what breaks as state coordination scales. Table of Contents 1. The Problem with Traditional State Management 2. What Are Signals, Really? 3. The Mental Model Shift 4. When Signals Matter (And When They Don't) 5. Common Patterns and Anti-Patterns 6. Real-World Scenarios 7. Performance Considerations 8. Integration Strategies 9. Testing Reactive Code 10. Migration Guide --------------------------------------------------------------------- The Problem with Traditional State Management As developers, we've all written variations of this code: class OrderService: def __init__(self): self.orders = [] self.total_revenue = 0.0 self.daily_stats = {} self.notification_service = NotificationService() self.analytics_service = AnalyticsService() def add_order(self, order): self.orders.append(order) self.total_revenue += order.amount self._update_daily_stats(order) self._send_notifications(order) self._track_analytics(order) def _update_daily_stats(self, order): date = order.created_at.date() if date not in self.daily_stats: self.daily_stats[date] = {"count": 0, "revenue": 0.0} self.daily_stats[date]["count"] += 1 self.daily_stats[date]["revenue"] += order.amount def _send_notifications(self, order): if order.amount > 1000: self.notification_service.send_high_value_alert(order) if len(self.orders) % 100 == 0: self.notification_service.send_milestone_alert(len(self.orders)) def _track_analytics(self, order): self.analytics_service.track_order(order) if self.total_revenue > 50000: self.analytics_service.track_milestone("revenue_50k") This looks reasonable at first glance. But let's visualize the hidden complexity: graph TD A[add_order called] --> B[Update orders list] B --> C[Update total_revenue] C --> D[Update daily_stats] D --> E[Send notifications] E --> F[Track analytics] G[ Miss one step?] --> H [Silent bugs] I[ Add new derived state?] --> J[Update every entry point] K[ Race condition?] --> L[Inconsistent state] style G fill:# F44336,color:#fff style I fill:#F44336,color:#fff style K fill:# F44336,color:#fff style H fill:#D32F2F,color:#fff style J fill:# D32F2F,color:#fff style L fill:#D32F2F,color:#fff The Hidden Dependencies The real problem isn't visible in the code - it's the implicit dependency graph: graph LR Orders[orders] --> Revenue[total_revenue] Orders --> Stats [daily_stats] Orders --> Notifications[notifications] Orders --> Analytics[analytics] Revenue --> Analytics Stats --> Notifications classDef implicit stroke-dasharray: 5 5 class Revenue,Stats,Notifications,Analytics implicit These dependencies are implicit and manually maintained. Every time orders changes, you must remember to update all dependent values in the correct order. 1. Tight Coupling Through Side Effects Every time we add an order, we must remember to update: * Total revenue * Daily statistics * Notifications * Analytics * Any future derived state Miss one update? Silent bugs. Add a new derived value? Modify every entry point. 2. Implicit Dependencies The relationship between orders and derived state is buried in imperative code. New developers (or future you) must trace through method calls to understand what depends on what. 3. Inconsistent State Windows Between the moment orders.append(order) executes and total_revenue += order.amount completes, your system is in an inconsistent state. In concurrent environments, this creates race conditions. 4. Testing Complexity Testing requires mocking all the side effects, or carefully orchestrating partial updates. Want to test just the revenue calculation? Good luck isolating it. 5. Performance Blind Spots Every order addition triggers every derived calculation, even if only some values are actually needed. No easy way to optimize without restructuring. --------------------------------------------------------------------- What Are Signals, Really? Signals aren't just "reactive variables." They're a dependency graph abstraction that inverts the control flow of state management. Important: Signals are value containers, not event streams. If you're thinking "this sounds like event listeners," there's a key difference. Signals hold current state and create a snapshot of your application at any point in time. When you call signal(), you get the current value - not a subscription to future events. # Signal: value container (current state) user_count = Signal(42) print(user_count()) # 42 - current value, right now # Event listener: reacts to future events button.addEventListener('click', handler) # waits for future clicks This distinction matters. Signals create a state graph-a snapshot of how values relate to each other at any moment. Event listeners create reaction patterns-responses to things happening over time. The Dependency Graph Model graph LR subgraph "Traditional Approach (Push-based)" A1[X changes] --> B1[Manually update Y] B1 --> C1[Manually update Z] C1 --> D1 [Manually notify observers] end subgraph "Signals Approach (Pull-based)" A2[X = Signal] --> B2[Y = Computed from X] A2 --> C2[Z = Computed from X, Y] B2 --> C2 C2 --> D2[Effect observes X, Y, Z] E2 ["X.set(new_value)"] --> F2[Y and Z update automatically] end style A1 fill:#F44336,color:#fff style A2 fill:#4CAF50,color:#fff Instead of push-based updates (imperative): # When X changes, manually update Y and Z x = new_value y = calculate_y(x) z = calculate_z(x, y) notify_observers(x, y, z) Signals provide pull-based derivation (declarative): # Define relationships once x = Signal(initial_value) y = Computed(lambda: calculate_y(x())) z = Computed(lambda: calculate_z(x(), y())) notify_effect = Effect(lambda: notify_observers(x(), y(), z())) # Updates happen automatically x.set(new_value) # y, z, and notifications update automatically The Three Primitives graph LR subgraph "Signal Primitives" A[Signal Holds value Notifies changes] B[Computed Derives from others Caches result] C[Effect Performs side effects Runs when deps change] end A --> B A --> C B --> B2[Other Computed] style A fill:#2196F3,color:#fff style B fill:#9C27B0,color:#fff style C fill:#FF9800,color:#fff Think of them as: * Signal: A cell in a spreadsheet that holds a value * Computed: A formula cell that derives from other cells (e.g., = A1+B1) * Effect: A macro that runs when referenced cells change The key insight: your entire application state becomes a live spreadsheet where changing one cell automatically updates all dependent cells. State Snapshots vs Event Reactions graph LR subgraph "Signals: State Snapshot" S1[user: Signal] --> S2 ["name: 'John'"] S1 --> S3["age: 30"] SC1[user_display: Computed] --> S4["'John (30)'"] S1 --> SC1 Note1[" Current state, right now"] end subgraph "Event Listeners: Future Reactions" E1 [button.addEventListener] --> E2["click handler"] E3 [window.addEventListener] --> E4["resize handler"] E5[socket.on] --> E6["message handler"] E7["...waiting for events"] Note2[" Waiting for future events"] end style S1 fill:#2196F3,color:#fff style SC1 fill:#9C27B0,color:#fff style E1 fill:#FF9800,color:#fff When you access a signal, you're asking: "What's the current state?" When you set up an event listener, you're saying: "Do this when something happens later." Example: Order Processing with Signals graph TD Orders[orders: Signal] --> Revenue[total_revenue: Computed] Orders --> Stats[daily_stats: Computed] Orders --> Count[order_count: Computed] Revenue --> NotifEffect[notification_effect: Effect] Stats --> NotifEffect Count --> NotifEffect Orders --> AnalyticsEffect [analytics_effect: Effect] Revenue --> AnalyticsEffect style Orders fill:#2196F3,color:#fff style Revenue fill:#9C27B0,color:#fff style Stats fill:#9C27B0,color:#fff style Count fill:#9C27B0,color:#fff style NotifEffect fill:#FF9800,color:#fff style AnalyticsEffect fill: #FF9800,color:#fff --------------------------------------------------------------------- The Mental Model Shift The hardest part about adopting Signals isn't the API - it's the mental model shift from imperative to declarative state management. Before vs After: Visualization flowchart LR subgraph "Imperative Thinking (Before)" A1[User Action] --> B1[Step 1: Update user] B1 --> C1[Step 2: Update stats] C1 --> D1 [Step 3: Check achievements] D1 --> E1[Step 4: Update leaderboard] E1 --> F1[Step 5: Send notification] F1 --> G1[Step 6: Log activity] H1 [ Easy to miss steps] I1[ Order matters] J1[ Hard to test parts] end subgraph "Declarative Thinking (After)" A2[user_action: Signal] A2 --> B2[user_stats: Computed] A2 --> C2[achievements: Computed] B2 --> C2 B2 --> D2[leaderboard: Computed] C2 --> E2 [notification_effect: Effect] A2 --> F2[logging_effect: Effect] G2[ Relationships declared once] H2[ Order handled automatically] I2[ Easy to test individually] end style A1 fill:#F44336,color:#fff style A2 fill:#4CAF50,color:#fff Before: Imperative Thinking "When this happens, do these things in this order." def process_user_action(user_id, action): user = get_user(user_id) user.last_action = action user.last_active = datetime.now() update_user_stats(user) check_achievement_progress(user) update_leaderboard(user) send_activity_notification(user) log_user_activity(user, action) After: Declarative Thinking "These relationships always hold true." # Define relationships once user_action = Signal(None) user_last_active = Computed(lambda: datetime.now() if user_action() else None) user_stats = Computed(lambda: calculate_stats(user_action())) achievements = Computed(lambda: check_achievements(user_stats())) leaderboard_position = Computed(lambda: calculate_position(user_stats())) # Effects for side effects notify_effect = Effect(lambda: send_notification(user_stats()) if user_action() else None) log_effect = Effect(lambda: log_activity(user_action()) if user_action() else None) # Usage becomes simple def process_user_action(user_id, action): user_action.set(action) # Everything else happens automatically Dependency Flow Visualization graph LR subgraph "Signal Dependency Flow" UA[user_action] --> ULA [user_last_active] UA --> US[user_stats] US --> ACH[achievements] US --> LB[leaderboard_position] US --> NE[notification_effect] UA --> LE [logging_effect] end subgraph "Change Propagation" Change ["user_action.set()"] --> Trigger[Triggers computation chain] Trigger --> Auto[All dependent values update automatically] end style UA fill:#2196F3,color:#fff style US fill:#9C27B0,color:#fff style ACH fill:#9C27B0,color:#fff style LB fill:#9C27B0,color:#fff style ULA fill:#9C27B0,color:#fff style NE fill:#FF9800,color:#fff style LE fill:#FF9800,color:#fff --------------------------------------------------------------------- When Signals Matter (And When They Don't) Signals Shine When: Visual Patterns graph LR subgraph "Complex Derived State" UP[user_profile] --> UPerm [user_permissions] UP --> UTheme[ui_theme] UPerm --> Dashboard [dashboard_config] UTheme --> Dashboard end subgraph "Cross-Cutting Concerns" Config[app_config] --> DB[database_pool] Config --> Cache [cache_client] Config --> Logger[logger_config] Config --> Monitor [monitoring] end subgraph "Real-Time Data Flows" Raw[raw_market_data] --> Norm[normalized_data] Norm --> Risk[risk_metrics] Risk --> Alerts [alerts] Alerts --> Broadcast[broadcast_effect] end subgraph "State Synchronization" Model[model_data] --> JSON[json_representation] Model --> XML[xml_representation] Model --> DB2[database_record] JSON --> CacheEffect[cache_effect] end style UP fill:#2196F3,color:#fff style Config fill:#2196F3,color:#fff style Raw fill:#2196F3,color:# fff style Model fill:#2196F3,color:#fff Signals Are Overkill When: graph LR subgraph " Avoid Signals For" A[Simple Linear Transformations] B[One-Shot Calculations] C[Pure Request-Response Patterns] end subgraph " Use Regular Functions" D[validate - enrich - save] E["calculate_tax(order)"] F[HTTP GET /users/123] end A --> D B --> E C --> F style A fill:#F44336,color:#fff style B fill:# F44336,color:#fff style C fill:#F44336,color:#fff style D fill:# 4CAF50,color:#fff style E fill:#4CAF50,color:#fff style F fill:# 4CAF50,color:#fff --------------------------------------------------------------------- Common Patterns and Anti-Patterns Pattern: Configuration Cascades graph LR Config[config: Signal] --> DBConfig[db_config: Computed] Config --> RedisConfig[redis_config: Computed] DBConfig --> DBPool [db_pool: Computed] RedisConfig --> CacheClient[cache_client: Computed] Config --> MonitorEffect[monitoring_effect: Effect] subgraph " Good: Grouped Configuration" GoodConfig["{host, port, user, password}"] end subgraph " Bad: Over-granular Signals" BadHost [db_host: Signal] BadPort[db_port: Signal] BadUser[db_user: Signal] BadPass[db_password: Signal] end style Config fill:#2196F3,color:#fff style GoodConfig fill:#4CAF50,color:#fff style BadHost fill:# F44336,color:#fff style BadPort fill:#F44336,color:#fff style BadUser fill:#F44336,color:#fff style BadPass fill:#F44336,color:#fff # Good: Grouped configuration from reaktiv import Signal, Computed # Good: Single grouped signal app_config = Signal({ "database": {"host": "localhost", "port": 5432, "user": "app", "password": "secret"}, "redis": {"host": "localhost", "port": 6379}, "api": {"timeout": 30, "retries": 3} }) # Derived configs db_config = Computed(lambda: app_config().get("database", {})) redis_config = Computed(lambda: app_config().get("redis", {})) # Connection pools derived from configs db_pool = Computed(lambda: create_db_pool(**db_config())) redis_client = Computed(lambda: create_redis_client(**redis_config())) # Bad: Overly granular signals db_host = Signal("localhost") db_port = Signal(5432) db_user = Signal("app") db_password = Signal("secret") # This approach makes it harder to update related settings together Pattern: Data Processing Pipelines graph LR Raw[raw_data: Signal] --> Clean[cleaned_data: Computed] Clean --> Agg[aggregated_data: Computed] Agg --> Format [formatted_output: Computed] Format --> CacheEffect[cache_effect: Effect] subgraph " Anti-Pattern: Side Effects in Computed" BadComputed[computed_with_api_call] BadComputed -.-> API [expensive_api_call] end subgraph " Better: Effects for Side Effects" GoodTrigger[api_trigger: Signal] GoodTrigger --> GoodEffect [api_effect: Effect] end style Raw fill:#2196F3,color:#fff style Clean fill:#9C27B0,color:#fff style Agg fill:#9C27B0,color:#fff style Format fill:#9C27B0,color:#fff style CacheEffect fill:#FF9800,color:# fff style BadComputed fill:#F44336,color:#fff style GoodTrigger fill: #4CAF50,color:#fff style GoodEffect fill:#4CAF50,color:#fff # Good: Clean separation of computation and effects from reaktiv import Signal, Computed, Effect # Data pipeline raw_data = Signal([]) cleaned_data = Computed(lambda: [clean_item(item) for item in raw_data()]) aggregated_data = Computed(lambda: aggregate_by_category(cleaned_data())) formatted_output = Computed(lambda: format_for_display(aggregated_data())) # Good: Side effects in Effects only cache_effect = Effect(lambda: cache_service.store("agg_data", formatted_output())) # Bad: Side effects in Computed def bad_computed_with_api_call(): data = expensive_api_call() # Side effect! return process_data(data) # Better: Use separate Signal and Effect api_trigger = Signal(False) def api_effect(): if api_trigger(): data = expensive_api_call() processed = process_data(data) store_result(processed) api_effect_instance = Effect(api_effect) Pattern: Event Sourcing Integration Good: Signals as event processors event_stream = Signal([]) current_state = Computed(lambda: reduce_events(event_stream())) projections = { "user_stats": Computed(lambda: project_user_stats(event_stream())), "daily_summary": Computed(lambda: project_daily_summary(event_stream())) } # Append events, projections update automatically def add_event(event): event_stream.update(lambda events: events + [event]) graph TD EventStream[event_stream: Signal] --> CurrentState [current_state: Computed] EventStream --> UserStats[user_stats: Computed] EventStream --> DailySummary[daily_summary: Computed] AddEvent[add_event] --> EventStream style EventStream fill:# 2196F3,color:#fff style CurrentState fill:#9C27B0,color:#fff style UserStats fill:#9C27B0,color:#fff style DailySummary fill:# 9C27B0,color:#fff style AddEvent fill:#4CAF50,color:#fff --------------------------------------------------------------------- Real-World Scenarios Scenario 1: Microservice Configuration Management graph TB subgraph "Configuration Sources" ENV[env_config: Signal] FILE[file_config: Signal] REMOTE[remote_config: Signal] end subgraph "Merged Configuration" ENV --> EFFECTIVE[effective_config: Computed] FILE --> EFFECTIVE REMOTE --> EFFECTIVE end subgraph "Service Configs" EFFECTIVE --> DBCONFIG[database_config: Computed] EFFECTIVE --> REDISCONFIG[redis_config: Computed] EFFECTIVE --> FEATURES [feature_flags: Computed] end subgraph "Service Instances" DBCONFIG --> DBPOOL[db_pool: Computed] REDISCONFIG --> CACHECLIENT [cache_client: Computed] end subgraph "Effects" EFFECTIVE --> LOGGER [config_logger: Effect] EFFECTIVE --> METRICS[metrics_updater: Effect] end style ENV fill:#4CAF50,color:#fff style FILE fill:# 4CAF50,color:#fff style REMOTE fill:#4CAF50,color:#fff style EFFECTIVE fill:#9C27B0,color:#fff class ServiceConfig: def __init__(self): # Base configuration sources self.env_config = Signal(os.environ.copy()) self.file_config = Signal(load_config_file()) self.remote_config = Signal({}) # Updated via API calls # Merged configuration with precedence self.effective_config = Computed(lambda: { **self.file_config(), **self.remote_config(), **self.env_config() }) # Service-specific configurations self.database_config = Computed( lambda: DatabaseConfig.from_dict(self.effective_config().get("database", {})) ) self.redis_config = Computed( lambda: RedisConfig.from_dict(self.effective_config().get("redis", {})) ) self.feature_flags = Computed( lambda: self.effective_config().get("features", {}) ) # Derived services self.db_pool = Computed(lambda: create_database_pool(self.database_config())) self.cache_client = Computed(lambda: create_redis_client(self.redis_config())) # Effects for configuration changes self._config_logger = Effect( lambda: logger.info(f"Config updated: {list(self.effective_config().keys())}") ) self._metrics_updater = Effect( lambda: update_config_metrics(self.effective_config()) ) def update_remote_config(self, new_config): """Called by configuration service webhook""" self.remote_config.set(new_config) # Database pool, cache client, etc. automatically recreated Scenario 2: Real-Time Analytics Dashboard graph TB subgraph "Data Sources" EVENTS[raw_events: Signal] SESSIONS [user_sessions: Signal] METRICS[system_metrics: Signal] TIMEWINDOW [time_window: Signal] end subgraph "Time Filtering" TIMEWINDOW --> CUTOFF[cutoff_time: Computed] EVENTS --> RECENT[recent_events: Computed] SESSIONS --> ACTIVE[active_sessions: Computed] CUTOFF --> RECENT CUTOFF --> ACTIVE end subgraph "Analytics" RECENT --> COUNTS [event_counts: Computed] COUNTS --> CONVERSION[conversion_rate: Computed] ACTIVE --> USERCOUNT[active_user_count: Computed] COUNTS --> DASHBOARD[dashboard_data: Computed] CONVERSION --> DASHBOARD USERCOUNT --> DASHBOARD METRICS --> DASHBOARD TIMEWINDOW --> DASHBOARD end subgraph "Effects" DASHBOARD --> WEBSOCKET [websocket_broadcaster: Effect] CONVERSION --> ALERTS[alert_monitor: Effect] end style EVENTS fill:#2196F3,color:#fff style SESSIONS fill: #2196F3,color:#fff style METRICS fill:#2196F3,color:#fff style TIMEWINDOW fill:#2196F3,color:#fff # Real-Time Analytics Implementation from reaktiv import Signal, Computed, Effect import asyncio class AnalyticsDashboard: def __init__(self, websocket): # Data sources self.raw_events = Signal([]) self.time_window = Signal(60) # Last 60 seconds # Computed metrics self.cutoff_time = Computed( lambda: time.time() - self.time_window() ) self.recent_events = Computed( lambda: [e for e in self.raw_events() if e["timestamp"] >= self.cutoff_time()] ) self.event_counts = Computed( lambda: { "total": len(self.recent_events()), "by_type": self._count_by_type(self.recent_events()) } ) # Dashboard data self.dashboard_data = Computed( lambda: { "counts": self.event_counts(), "window": self.time_window(), "updated_at": time.time() } ) async def dashboard_update(): await self._send_dashboard_update(websocket, self.dashboard_data()) # Effect to broadcast updates self._broadcaster = Effect(dashboard_update) def _count_by_type(self, events): result = {} for event in events: event_type = event.get("type", "unknown") result[event_type] = result.get(event_type, 0) + 1 return result async def _send_dashboard_update(self, websocket, data): if websocket.open: await websocket.send_json(data) def add_event(self, event): self.raw_events.update(lambda events: events + [event]) def change_time_window(self, seconds): self.time_window.set(seconds) Scenario 3: Distributed System Health Monitoring graph TB subgraph "Raw Status Data" NODES[node_statuses: Signal] SERVICES[service_statuses: Signal] end subgraph "Cluster Health Metrics" NODES --> HEALTHY[healthy_nodes: Computed] HEALTHY --> CAPACITY[cluster_capacity: Computed] NODES --> LOAD[cluster_load: Computed] CAPACITY --> LOADPCT[load_percentage: Computed] LOAD --> LOADPCT end subgraph "Service Availability" SERVICES --> AVAILABILITY [service_availability: Computed] AVAILABILITY --> CRITICAL [critical_services: Computed] end subgraph "Automated Actions" HEALTHY --> LBUPDATE[load_balancer_updater: Effect] CRITICAL --> INCIDENT[alert_manager: Effect] LOADPCT --> SCALER[capacity_scaler: Effect] end style NODES fill:#2196F3,color:#fff style SERVICES fill:# 2196F3,color:#fff style LBUPDATE fill:#FF9800,color:#fff style INCIDENT fill:#E91E63,color:#fff style SCALER fill:#9C27B0,color:#fff # Health Monitoring Implementation from reaktiv import Signal, Computed, Effect class ClusterMonitor: def __init__(self, alert_service, load_balancer): # Raw status data self.node_statuses = Signal({}) # node_id -> status # Derived metrics self.healthy_nodes = Computed( lambda: [node_id for node_id, status in self.node_statuses().items() if status["healthy"]] ) self.cluster_capacity = Computed( lambda: sum(status["capacity"] for status in self.node_statuses().values() if status["healthy"]) ) self.cluster_load = Computed( lambda: sum(status["current_load"] for status in self.node_statuses().values()) ) self.load_percentage = Computed( lambda: (self.cluster_load() / self.cluster_capacity() * 100) if self.cluster_capacity() > 0 else 100 ) # Effects for automated actions self._lb_updater = Effect( lambda: load_balancer.update_backends(self.healthy_nodes()) ) self._scaler = Effect(lambda: self._check_scaling_needs()) def _check_scaling_needs(self): load_pct = self.load_percentage() if load_pct > 80: # Trigger scaling print(f"High load detected ({load_pct:.1f}%), initiating scale out") elif load_pct < 20: # Scale in print(f"Low load detected ({load_pct:.1f}%), initiating scale in") def update_node_status(self, node_id, status): self.node_statuses.update(lambda statuses: { **statuses, node_id: status }) --------------------------------------------------------------------- Performance Considerations Fine-Grained Reactivity Visualization graph LR subgraph "Traditional: Everything Recalculates" T1[Update Data] --> T2[Calculate Mean] T1 --> T3[Calculate Std Dev] T1 --> T4 [Calculate Percentiles] T5[ All run every time] end subgraph "Signals: Only Affected Parts Run" S1["data.set()"] --> S2[Check: mean accessed?] S2 --> S3[ Calculate mean only] S4[std_dev not accessed] S5[percentiles not accessed] S6[ Lazy evaluation] end style T1 fill:#F44336,color:#fff style S1 fill:#4CAF50,color:#fff style T5 fill:#D32F2F,color:#fff style S6 fill:#388E3C,color:#fff # Optimizing computation with fine-grained signals from reaktiv import Signal, Computed, Effect # Dataset data = Signal([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) # Expensive computations mean = Computed(lambda: sum(data()) / len(data()) if data() else 0) def calculate_std_dev(values, mean_value): if not values: return 0 return (sum((x - mean_value) ** 2 for x in values) / len(values)) ** 0.5 std_dev = Computed(lambda: calculate_std_dev(data(), mean())) # Efficient: Only accessed values are computed def display_stats(): # If we only access mean, std_dev won't be calculated print(f"Mean: {mean()}") # Conditional computation: Only calculate std_dev when needed if user_wants_detailed_stats(): print(f"Standard Deviation: {std_dev()}") stats_effect = Effect(display_stats) # Adding data only triggers what's needed data.update(lambda d: d + [11]) Memory Management Pattern graph TB Signal --> WeakRef[Weak References to Dependents] subgraph " Problem: Effect GC" Effect1[Effect created] -.-> GC[Garbage Collected] GC --> Lost[Effect lost!] end subgraph " Solution: Retain Reference" Component[Component] --> EffectRef[self._effect] EffectRef --> Effect2[Effect retained] end style Effect1 fill:#F44336,color:#fff style Lost fill:#D32F2F,color:# fff style EffectRef fill:#4CAF50,color:#fff style Effect2 fill:# 388E3C,color:#fff # Proper effect management in components from reaktiv import Signal, Computed, Effect class Component: def __init__(self): self.counter = Signal(0) # BAD: Effect not retained, will be garbage collected Effect(lambda: print(f"Counter: {self.counter()}")) # GOOD: Store reference to effect self._effect = Effect(lambda: print(f"Counter: {self.counter()}")) def increment(self): self.counter.update(lambda c: c + 1) def cleanup(self): # Optional: Explicitly dispose the effect when done self._effect.dispose() --------------------------------------------------------------------- Migration Guide Migration Phases Visualization graph TB subgraph "Phase 1: Identify Candidates" P1A[Manual State Sync] P1B[Observer Patterns] P1C[Cache Invalidation] end subgraph "Phase 2: Gradual Replacement" P2A[Replace Leaf Nodes with Signals] P2B[Add Computed Values for Derived State] P2C[Replace Side Effects with Effects] end subgraph "Phase 3: Remove Manual Coordination" P3A [Declarative Relationships] P3B[Automatic Updates] P3C[Simplified API] end P1A --> P2A P1B --> P2B P1C --> P2C P2A --> P3A P2B --> P3B P2C --> P3C style P1A fill:#FF9800,color:#fff style P1B fill:# FF9800,color:#fff style P1C fill:#FF9800,color:#fff style P2A fill:# 4CAF50,color:#fff style P2B fill:#4CAF50,color:#fff style P2C fill:# 4CAF50,color:#fff style P3A fill:#2196F3,color:#fff style P3B fill:# 2196F3,color:#fff style P3C fill:#2196F3,color:#fff Before and After Architecture graph LR subgraph "Before: Manual Coordination" OrderAdd["add_order() "] --> OrderList["orders.append()"] OrderList --> Revenue ["total_revenue +="] Revenue --> Stats["update_daily_stats()"] Stats --> Notif["send_notifications()"] Notif --> Analytics ["track_analytics()"] Error1[ Forget a step?] Error2[ Wrong order?] Error3[ Race condition?] end subgraph "After: Declarative Relationships" OrderSignal[orders: Signal] OrderSignal --> RevenueComp[total_revenue: Computed] OrderSignal --> StatsComp [daily_stats: Computed] OrderSignal --> NotifEffect [notification_effect: Effect] OrderSignal --> AnalyticsEffect [analytics_effect: Effect] Success1[ Relationships declared once] Success2[ Automatic consistency] Success3[ Easy to test] end style OrderAdd fill:#F44336,color:#fff style OrderSignal fill:# 4CAF50,color:#fff style Error1 fill:#D32F2F,color:#fff style Error2 fill:#D32F2F,color:#fff style Error3 fill:#D32F2F,color:#fff style Success1 fill:#388E3C,color:#fff style Success2 fill:#388E3C,color:# fff style Success3 fill:#388E3C,color:#fff --------------------------------------------------------------------- Conclusion Signals represent a fundamental shift from imperative to declarative state management. They're not just "reactive variables" - they're a way to express complex state relationships that automatically maintain consistency. The Signal Advantage graph LR subgraph "Traditional Challenges" TC1[Manual Coordination] TC2[Implicit Dependencies] TC3[Inconsistent State] TC4[Testing Complexity] TC5[Performance Blind Spots] end subgraph "Signal Solutions" SS1[Automatic Updates] SS2[Explicit Relationships] SS3 [Always Consistent] SS4[Isolated Testing] SS5[Fine-grained Reactivity] end TC1 --> SS1 TC2 --> SS2 TC3 --> SS3 TC4 --> SS4 TC5 --> SS5 style TC1 fill:#F44336,color:#fff style TC2 fill:# F44336,color:#fff style TC3 fill:#F44336,color:#fff style TC4 fill:# F44336,color:#fff style TC5 fill:#F44336,color:#fff style SS1 fill:# 4CAF50,color:#fff style SS2 fill:#4CAF50,color:#fff style SS3 fill:# 4CAF50,color:#fff style SS4 fill:#4CAF50,color:#fff style SS5 fill:# 4CAF50,color:#fff The key insight is that most state management bugs come from forgetting to update something when related state changes. Signals eliminate this entire class of bugs by making relationships explicit and automatic. Start small: identify one area of your codebase where you manually coordinate state updates. Replace it with Signals, and experience the difference. Once you see how much cleaner and more reliable it makes your code, you'll start seeing Signal opportunities everywhere. Remember: Signals are a tool, not a religion. Use them where they add value - complex derived state, cross-cutting concerns, real-time data flows. Skip them for simple, linear transformations. Published by: Tuan Anh Bui bui.app (c) 2025 Powered by Ghost