Watching Read Models
The IReadModels API provides the Watch<TReadModel>() method to observe real-time changes to read models. This is useful for building reactive user interfaces, triggering background jobs, or maintaining synchronized caches based on read model updates.
Overview
Section titled “Overview”When you watch a read model using Watch<T>, Chronicle:
- Returns an
IObservable<ReadModelChangeset<T>>that emits changes as they occur - Observes changes from both projections and reducers
- Provides detailed information about what changed and why
- Allows you to react immediately to state transitions
This enables reactive patterns where your application can respond instantly to read model changes without polling.
Basic Usage
Section titled “Basic Usage”Setting Up Observation
Section titled “Setting Up Observation”Create an observer for a read model type:
public class NotificationService{ readonly IEventStore _eventStore;
public NotificationService(IEventStore eventStore) { _eventStore = eventStore; }
public void WatchOrders() { var observable = _eventStore.ReadModels.Watch<Order>();
subscription = observable.Subscribe( changeset => { Console.WriteLine($"Order {changeset.ModelKey} changed"); Console.WriteLine($"New state: {changeset.ReadModel}"); }, error => Console.WriteLine($"Error: {error}"), () => Console.WriteLine("Watch completed") ); }}The ReadModelChangeset Structure
Section titled “The ReadModelChangeset Structure”Each changeset emitted by the observable contains:
public record ReadModelChangeset<TReadModel>( EventStoreNamespaceName Namespace, // The namespace for the event store ReadModelKey ModelKey, // The key of the changed instance TReadModel? ReadModel, // The current state of the instance (null when removed) bool Removed); // Whether the read model was removedUsage Patterns
Section titled “Usage Patterns”WebSocket Updates for Real-Time UI
Section titled “WebSocket Updates for Real-Time UI”Push real-time updates to connected clients:
public class OrderWatchHub : Hub{ readonly IEventStore _eventStore; readonly IDisposable _subscription;
public OrderWatchHub(IEventStore eventStore) { _eventStore = eventStore; }
public async Task StartWatchingOrders() { var observable = _eventStore.ReadModels.Watch<Order>();
_subscription = observable.Subscribe( async changeset => { await Clients.All.SendAsync("OrderChanged", new { orderId = changeset.ModelKey, order = changeset.ReadModel, removed = changeset.Removed }); }, error => Console.WriteLine($"Error: {error}") ); }
public override async Task OnDisconnectedAsync(Exception? exception) { _subscription?.Dispose(); await base.OnDisconnectedAsync(exception); }}Caching Updates
Section titled “Caching Updates”Keep an in-memory cache synchronized with read model changes:
public class CachedReadModelService{ readonly IEventStore _eventStore; readonly Dictionary<ReadModelKey, Account> _cache = new(); IDisposable? _subscription;
public void StartCacheSync() { var observable = _eventStore.ReadModels.Watch<Account>();
_subscription = observable.Subscribe( changeset => { if (changeset.Removed) { _cache.Remove(changeset.ModelKey); } else if (changeset.ReadModel is not null) { _cache[changeset.ModelKey] = changeset.ReadModel; }
Console.WriteLine($"Cache updated: {changeset.ModelKey}"); }, error => Console.WriteLine($"Cache sync error: {error}") ); }
public Account? GetFromCache(Guid accountId) { _cache.TryGetValue(accountId, out var account); return account; }
public void StopCacheSync() { _subscription?.Dispose(); }}Background Job Triggering
Section titled “Background Job Triggering”Automatically trigger processing when read models change:
public class BackgroundJobService{ readonly IEventStore _eventStore; readonly IJobQueue _jobQueue;
public void WatchAndProcess() { var observable = _eventStore.ReadModels.Watch<Order>();
observable.Subscribe( async changeset => { // Only process completed orders if (changeset.ReadModel?.Status == OrderStatus.Completed) { await _jobQueue.Enqueue(new GenerateInvoiceJob { OrderId = changeset.ModelKey, Order = changeset.ReadModel }); } } ); }}State Transition Notifications
Section titled “State Transition Notifications”React to specific states as the read model reaches them:
public class StateChangeNotifier{ readonly IEventStore _eventStore;
public void WatchForStateChanges() { var observable = _eventStore.ReadModels.Watch<Order>();
observable.Subscribe(changeset => { if (changeset.Removed || changeset.ReadModel is null) { return; }
var current = changeset.ReadModel;
// Order shipped if (current.Status == OrderStatus.Shipped) { NotifyCustomerOfShipment(changeset.ModelKey, current); }
// Order completed if (current.Status == OrderStatus.Completed) { SendFeedbackRequest(changeset.ModelKey); } }); }}The changeset carries the current state of the read model, not its previous state. If you need to react to a transition (the difference between two states), track the last value you saw per ModelKey yourself, or derive the transition from a dedicated event using a reactor.
Logging and Audit Trail
Section titled “Logging and Audit Trail”Log all read model changes for auditing:
public class AuditLogger{ readonly IEventStore _eventStore; readonly ILogger<AuditLogger> _logger;
public void LogAllChanges() { var observable = _eventStore.ReadModels.Watch<Order>();
observable.Subscribe(changeset => { _logger.LogInformation( "Order changed: Key={OrderKey}, Current={Current}, Removed={Removed}", changeset.ModelKey, changeset.ReadModel, changeset.Removed ); }); }}Filtering and Transforming
Section titled “Filtering and Transforming”Simplifying with ToObservableReadModel()
Section titled “Simplifying with ToObservableReadModel()”When you only need the read model instances themselves and don’t care about the changeset metadata (like the key, previous state, or events), use the .ToObservableReadModel() extension method to simplify your subscription:
public class SimpleWatcher{ readonly IEventStore _eventStore;
public void WatchOrders() { // ToObservableReadModel() filters out removed items and emits only the read model instances var observable = _eventStore.ReadModels.Watch<Order>() .ToObservableReadModel();
observable.Subscribe( order => { // Receive the Order instance directly, not the full changeset Console.WriteLine($"Order status: {order.Status}"); Console.WriteLine($"Amount: {order.TotalAmount}"); }, error => Console.WriteLine($"Error: {error}"), () => Console.WriteLine("Watch completed") ); }}This extension method:
- Automatically filters out removed read models
- Skips null read model instances
- Returns an
ISubject<TReadModel>instead ofIObservable<ReadModelChangeset<TReadModel>> - Propagates errors and completion signals
This is particularly useful when building reactive UIs where you want to bind directly to the read model state without dealing with changeset structures.
Using LINQ Operators
Section titled “Using LINQ Operators”LINQ operators work on observables for advanced filtering:
public class FilteredWatcher{ readonly IEventStore _eventStore;
public void WatchHighValueOrders() { var observable = _eventStore.ReadModels.Watch<Order>() .Where(changeset => changeset.ReadModel?.TotalAmount > 1000m) .Select(changeset => new { OrderId = changeset.ModelKey, Amount = changeset.ReadModel!.TotalAmount, Status = changeset.ReadModel!.Status });
observable.Subscribe(order => { Console.WriteLine($"High-value order: {order.OrderId} with amount {order.Amount}"); }); }}Debouncing Rapid Changes
Section titled “Debouncing Rapid Changes”Handle multiple rapid changes as a single operation:
public class TrottledWatcher{ readonly IEventStore _eventStore;
public void WatchWithDebounce() { var observable = _eventStore.ReadModels.Watch<ShoppingCart>() .Throttle(TimeSpan.FromMilliseconds(500)) .Subscribe(changeset => { // Update UI only once per 500ms, combining rapid changes UpdateUI(changeset); }); }}Error Handling
Section titled “Error Handling”Recovering from Errors
Section titled “Recovering from Errors”Handle errors gracefully and optionally retry:
public class ResilientWatcher{ readonly IEventStore _eventStore;
public void WatchWithErrorHandling() { var observable = _eventStore.ReadModels.Watch<Account>() .Retry(3) // Automatically retry 3 times on error .Catch<ReadModelChangeset<Account>, Exception>( error => { _logger.LogError(error, "Watch error, falling back to polling"); return StartPollingFallback(); } );
observable.Subscribe( changeset => ProcessChangeset(changeset), error => _logger.LogError(error, "Watch failed after retries") ); }}Performance Considerations
Section titled “Performance Considerations”Memory Usage
Section titled “Memory Usage”Each subscription holds a reference to keep the observable alive. Always dispose subscriptions when done:
public class ManagedWatcher : IDisposable{ IDisposable? _subscription;
public void Start() { _subscription = _eventStore.ReadModels.Watch<Order>() .Subscribe(ProcessChangeset); }
public void Dispose() { _subscription?.Dispose(); }}Selective Watching
Section titled “Selective Watching”Only watch the read models you need to avoid unnecessary processing:
// Good - watch only what you needvar orderWatch = _eventStore.ReadModels.Watch<Order>();
// Avoid - watching everythingvar allWatches = new[]{ _eventStore.ReadModels.Watch<Order>(), _eventStore.ReadModels.Watch<Customer>(), _eventStore.ReadModels.Watch<Inventory>(), _eventStore.ReadModels.Watch<Shipment>()};Use Cases
Section titled “Use Cases”Real-Time Dashboards
Section titled “Real-Time Dashboards”Update dashboard widgets when data changes:
public class DashboardHub : Hub{ readonly DashboardWatcher _watcher;
public async Task SubscribeToDashboard(Guid userId) { _watcher.WatchAccountSummary(userId).Subscribe( async changeset => { await Clients.User(userId.ToString()) .SendAsync("DashboardUpdated", changeset.ReadModel); } ); }}Event-Driven Architecture
Section titled “Event-Driven Architecture”Coordinate actions across multiple systems:
public class OrderOrchestrator{ public void SetupWorkflow() { // When an order changes status, trigger appropriate actions _eventStore.ReadModels.Watch<Order>() .Subscribe(changeset => { if (changeset.ReadModel is null) { return; }
switch (changeset.ReadModel.Status) { case OrderStatus.New: _inventoryService.ReserveItems(changeset.ReadModel); break; case OrderStatus.Confirmed: _paymentService.ProcessPayment(changeset.ReadModel); break; case OrderStatus.Shipped: _notificationService.SendShipmentNotice(changeset.ReadModel); break; } }); }}Live Collaboration Features
Section titled “Live Collaboration Features”Sync state across multiple users in real-time:
public class CollaborativeWorkspace{ public void SyncChanges() { _eventStore.ReadModels.Watch<Document>() .Subscribe(changeset => { if (changeset.ReadModel is null) { return; }
// Broadcast to all users editing this document BroadcastToUsers(changeset.ReadModel.DocumentId, changeset.ReadModel); }); }}Best Practices
Section titled “Best Practices”Always Dispose Subscriptions
Section titled “Always Dispose Subscriptions”Memory leaks can occur if subscriptions aren’t disposed:
// Goodusing var subscription = _eventStore.ReadModels.Watch<Order>().Subscribe(/*...*/);
// Also goodIDisposable? _subscription;
public void Start() => _subscription = _eventStore.ReadModels.Watch<Order>().Subscribe(/*...*/);public void Stop() => _subscription?.Dispose();Use Appropriate Threading Models
Section titled “Use Appropriate Threading Models”Be aware of which thread your observer runs on:
public void WatchWithUI(){ _eventStore.ReadModels.Watch<Order>() .ObserveOn(SynchronizationContext.Current) // Switch to UI thread .Subscribe(changeset => UpdateUI(changeset));}Handle Backpressure
Section titled “Handle Backpressure”If processing is slow, manage incoming changes:
public void WatchWithBackpressure(){ _eventStore.ReadModels.Watch<Order>() .Buffer(TimeSpan.FromSeconds(1)) // Batch changes .SelectMany(batch => batch) // Process batch .Subscribe(ProcessSlowly);}Related Topics
Section titled “Related Topics”- Getting a Single Instance - Query current read model state
- Getting a Collection of Instances - Query all instances
- Getting Snapshots - Retrieve historical state snapshots
- Projections - Learn more about read model projections
- Reactors - Event processing with reactors