Watching Read Models
The IReadModels API provides the Watch<TReadModel>() method to observe real-time changes to read models. This is useful for building reactive user interfaces, triggering background jobs, or maintaining synchronized caches based on read model updates.
Tip
For information on how to define and configure read models in Arc, see Read Models.
Overview
When you watch a read model using Watch<T>, Chronicle:
- Returns an
IObservable<ReadModelChangeset<T>>that emits changes as they occur - Observes changes from both projections and reducers
- Provides detailed information about what changed and why
- Allows you to react immediately to state transitions
This enables reactive patterns where your application can respond instantly to read model changes without polling.
Basic Usage
Setting Up Observation
Create an observer for a read model type:
public class NotificationService
{
readonly IEventStore _eventStore;
public NotificationService(IEventStore eventStore)
{
_eventStore = eventStore;
}
public void WatchOrders()
{
var observable = _eventStore.ReadModels.Watch<Order>();
subscription = observable.Subscribe(
changeset =>
{
Console.WriteLine($"Order {changeset.ReadModelKey} changed");
Console.WriteLine($"New state: {changeset.Current}");
},
error => Console.WriteLine($"Error: {error}"),
() => Console.WriteLine("Watch completed")
);
}
}
The ReadModelChangeset Structure
Each changeset emitted by the observable contains:
public record ReadModelChangeset<TReadModel>(
ReadModelKey ReadModelKey, // The key of the changed instance
TReadModel Current, // The current state of the instance
TReadModel? Previous, // The previous state (null on first change)
IEnumerable<AppendedEvent> Events, // Events that caused this change
CorrelationId CorrelationId); // Links related changes together
Usage Patterns
WebSocket Updates for Real-Time UI
Push real-time updates to connected clients:
public class OrderWatchHub : Hub
{
readonly IEventStore _eventStore;
readonly IDisposable _subscription;
public OrderWatchHub(IEventStore eventStore)
{
_eventStore = eventStore;
}
public async Task StartWatchingOrders()
{
var observable = _eventStore.ReadModels.Watch<Order>();
_subscription = observable.Subscribe(
async changeset =>
{
await Clients.All.SendAsync("OrderChanged", new
{
orderId = changeset.ReadModelKey,
order = changeset.Current,
previousState = changeset.Previous
});
},
error => Console.WriteLine($"Error: {error}")
);
}
public override async Task OnDisconnectedAsync(Exception? exception)
{
_subscription?.Dispose();
await base.OnDisconnectedAsync(exception);
}
}
Caching Updates
Keep an in-memory cache synchronized with read model changes:
public class CachedReadModelService
{
readonly IEventStore _eventStore;
readonly Dictionary<ReadModelKey, Account> _cache = new();
IDisposable? _subscription;
public void StartCacheSync()
{
var observable = _eventStore.ReadModels.Watch<Account>();
_subscription = observable.Subscribe(
changeset =>
{
_cache[changeset.ReadModelKey] = changeset.Current;
Console.WriteLine($"Cache updated: {changeset.ReadModelKey}");
},
error => Console.WriteLine($"Cache sync error: {error}")
);
}
public Account? GetFromCache(Guid accountId)
{
_cache.TryGetValue(accountId, out var account);
return account;
}
public void StopCacheSync()
{
_subscription?.Dispose();
}
}
Background Job Triggering
Automatically trigger processing when read models change:
public class BackgroundJobService
{
readonly IEventStore _eventStore;
readonly IJobQueue _jobQueue;
public void WatchAndProcess()
{
var observable = _eventStore.ReadModels.Watch<Order>();
observable.Subscribe(
async changeset =>
{
// Only process completed orders
if (changeset.Current.Status == OrderStatus.Completed)
{
await _jobQueue.Enqueue(new GenerateInvoiceJob
{
OrderId = changeset.ReadModelKey,
Order = changeset.Current
});
}
}
);
}
}
State Transition Notifications
React to specific state changes:
public class StateChangeNotifier
{
readonly IEventStore _eventStore;
public void WatchForStateChanges()
{
var observable = _eventStore.ReadModels.Watch<Order>();
observable.Subscribe(changeset =>
{
var previous = changeset.Previous;
var current = changeset.Current;
// Order shipped
if (previous?.Status != OrderStatus.Shipped && current.Status == OrderStatus.Shipped)
{
NotifyCustomerOfShipment(changeset.ReadModelKey, current);
}
// Payment received
if (previous?.TotalPaid < current.TotalPaid)
{
NotifyOfPayment(changeset.ReadModelKey, current.TotalPaid - (previous?.TotalPaid ?? 0m));
}
// Order completed
if (previous?.Status != OrderStatus.Completed && current.Status == OrderStatus.Completed)
{
SendFeedbackRequest(changeset.ReadModelKey);
}
});
}
}
Logging and Audit Trail
Log all read model changes for auditing:
public class AuditLogger
{
readonly IEventStore _eventStore;
readonly ILogger<AuditLogger> _logger;
public void LogAllChanges()
{
var observable = _eventStore.ReadModels.Watch<Order>();
observable.Subscribe(changeset =>
{
_logger.LogInformation(
"Order changed: Key={OrderKey}, Previous={Previous}, Current={Current}, CorrelationId={CorrelationId}",
changeset.ReadModelKey,
changeset.Previous,
changeset.Current,
changeset.CorrelationId
);
});
}
}
Filtering and Transforming
Simplifying with ToObservableReadModel()
When you only need the read model instances themselves and don't care about the changeset metadata (like the key, previous state, or events), use the .ToObservableReadModel() extension method to simplify your subscription:
public class SimpleWatcher
{
readonly IEventStore _eventStore;
public void WatchOrders()
{
// ToObservableReadModel() filters out removed items and emits only the read model instances
var observable = _eventStore.ReadModels.Watch<Order>()
.ToObservableReadModel();
observable.Subscribe(
order =>
{
// Receive the Order instance directly, not the full changeset
Console.WriteLine($"Order status: {order.Status}");
Console.WriteLine($"Amount: {order.TotalAmount}");
},
error => Console.WriteLine($"Error: {error}"),
() => Console.WriteLine("Watch completed")
);
}
}
This extension method:
- Automatically filters out removed read models
- Skips null read model instances
- Returns an
ISubject<TReadModel>instead ofIObservable<ReadModelChangeset<TReadModel>> - Propagates errors and completion signals
This is particularly useful when building reactive UIs where you want to bind directly to the read model state without dealing with changeset structures.
Using LINQ Operators
LINQ operators work on observables for advanced filtering:
public class FilteredWatcher
{
readonly IEventStore _eventStore;
public void WatchHighValueOrders()
{
var observable = _eventStore.ReadModels.Watch<Order>()
.Where(changeset => changeset.Current.TotalAmount > 1000m)
.Select(changeset => new
{
OrderId = changeset.ReadModelKey,
Amount = changeset.Current.TotalAmount,
Status = changeset.Current.Status
});
observable.Subscribe(order =>
{
Console.WriteLine($"High-value order: {order.OrderId} with amount {order.Amount}");
});
}
}
Debouncing Rapid Changes
Handle multiple rapid changes as a single operation:
public class TrottledWatcher
{
readonly IEventStore _eventStore;
public void WatchWithDebounce()
{
var observable = _eventStore.ReadModels.Watch<ShoppingCart>()
.Throttle(TimeSpan.FromMilliseconds(500))
.Subscribe(changeset =>
{
// Update UI only once per 500ms, combining rapid changes
UpdateUI(changeset);
});
}
}
Error Handling
Recovering from Errors
Handle errors gracefully and optionally retry:
public class ResilientWatcher
{
readonly IEventStore _eventStore;
public void WatchWithErrorHandling()
{
var observable = _eventStore.ReadModels.Watch<Account>()
.Retry(3) // Automatically retry 3 times on error
.Catch<ReadModelChangeset<Account>, Exception>(
error =>
{
_logger.LogError(error, "Watch error, falling back to polling");
return StartPollingFallback();
}
);
observable.Subscribe(
changeset => ProcessChangeset(changeset),
error => _logger.LogError(error, "Watch failed after retries")
);
}
}
Performance Considerations
Memory Usage
Each subscription holds a reference to keep the observable alive. Always dispose subscriptions when done:
public class ManagedWatcher : IDisposable
{
IDisposable? _subscription;
public void Start()
{
_subscription = _eventStore.ReadModels.Watch<Order>()
.Subscribe(ProcessChangeset);
}
public void Dispose()
{
_subscription?.Dispose();
}
}
Selective Watching
Only watch the read models you need to avoid unnecessary processing:
// Good - watch only what you need
var orderWatch = _eventStore.ReadModels.Watch<Order>();
// Avoid - watching everything
var allWatches = new[]
{
_eventStore.ReadModels.Watch<Order>(),
_eventStore.ReadModels.Watch<Customer>(),
_eventStore.ReadModels.Watch<Inventory>(),
_eventStore.ReadModels.Watch<Shipment>()
};
Use Cases
Real-Time Dashboards
Update dashboard widgets when data changes:
public class DashboardHub : Hub
{
readonly DashboardWatcher _watcher;
public async Task SubscribeToDashboard(Guid userId)
{
_watcher.WatchAccountSummary(userId).Subscribe(
async changeset =>
{
await Clients.User(userId.ToString())
.SendAsync("DashboardUpdated", changeset.Current);
}
);
}
}
Event-Driven Architecture
Coordinate actions across multiple systems:
public class OrderOrchestrator
{
public void SetupWorkflow()
{
// When an order changes status, trigger appropriate actions
_eventStore.ReadModels.Watch<Order>()
.Subscribe(changeset =>
{
switch (changeset.Current.Status)
{
case OrderStatus.New:
_inventoryService.ReserveItems(changeset.Current);
break;
case OrderStatus.Confirmed:
_paymentService.ProcessPayment(changeset.Current);
break;
case OrderStatus.Shipped:
_notificationService.SendShipmentNotice(changeset.Current);
break;
}
});
}
}
Live Collaboration Features
Sync state across multiple users in real-time:
public class CollaborativeWorkspace
{
public void SyncChanges()
{
_eventStore.ReadModels.Watch<Document>()
.Subscribe(changeset =>
{
// Broadcast to all users editing this document
BroadcastToUsers(changeset.Current.DocumentId, changeset.Current);
});
}
}
Best Practices
Always Dispose Subscriptions
Memory leaks can occur if subscriptions aren't disposed:
// Good
using var subscription = _eventStore.ReadModels.Watch<Order>().Subscribe(/*...*/);
// Also good
IDisposable? _subscription;
public void Start() => _subscription = _eventStore.ReadModels.Watch<Order>().Subscribe(/*...*/);
public void Stop() => _subscription?.Dispose();
Use Appropriate Threading Models
Be aware of which thread your observer runs on:
public void WatchWithUI()
{
_eventStore.ReadModels.Watch<Order>()
.ObserveOn(SynchronizationContext.Current) // Switch to UI thread
.Subscribe(changeset => UpdateUI(changeset));
}
Handle Backpressure
If processing is slow, manage incoming changes:
public void WatchWithBackpressure()
{
_eventStore.ReadModels.Watch<Order>()
.Buffer(TimeSpan.FromSeconds(1)) // Batch changes
.SelectMany(batch => batch) // Process batch
.Subscribe(ProcessSlowly);
}
Related Topics
- Getting a Single Instance - Query current read model state
- Getting a Collection of Instances - Query all instances
- Getting Snapshots - Retrieve historical state snapshots
- Projections - Learn more about read model projections
- Reactors - Event processing with reactors