Observing Collections
The MongoDB extensions in Cratis Applications provide powerful reactive programming capabilities through collection observation. This feature allows you to create observables that automatically notify subscribers when documents in a MongoDB collection change, providing real-time updates to your application.
Overview
Section titled “Overview”Collection observation leverages MongoDB’s Change Streams feature combined with Reactive Extensions (Rx.NET) to provide a seamless way to watch for changes in your data. The system automatically handles initial data loading, change detection, and notification of observers.
Key Features
Section titled “Key Features”- Real-time Updates: Automatically receive notifications when documents change
- Filtering Support: Observe only documents matching specific criteria
- Multiple Observation Types: Observe collections, single documents, or documents by ID
- Query Context Awareness: Integrates with Cratis query context for paging and sorting
- Automatic Cleanup: Proper resource management and cleanup when observations are disposed
Basic Collection Observation
Section titled “Basic Collection Observation”Observing All Documents
Section titled “Observing All Documents”public class AuthorService{ private readonly IMongoCollection<Author> _collection;
public AuthorService(IMongoCollection<Author> collection) { _collection = collection; }
public IObservable<IEnumerable<Author>> ObserveAllAuthors() { return _collection.Observe(); }}Observing with Filter (Expression)
Section titled “Observing with Filter (Expression)”public IObservable<IEnumerable<Author>> ObserveActiveAuthors(){ return _collection.Observe(author => author.IsActive);}Observing with Filter Definition
Section titled “Observing with Filter Definition”public IObservable<IEnumerable<Author>> ObserveAuthorsByCategory(string category){ var filter = Builders<Author>.Filter.Eq(a => a.Category, category); return _collection.Observe(filter);}Single Document Observation
Section titled “Single Document Observation”Observing Single Document with Filter
Section titled “Observing Single Document with Filter”public IObservable<Author> ObserveFeaturedAuthor(){ return _collection.ObserveSingle(author => author.IsFeatured);}Observing Document by ID
Section titled “Observing Document by ID”public IObservable<Author> ObserveAuthorById(AuthorId authorId){ return _collection.ObserveById<Author, AuthorId>(authorId);}Advanced Usage
Section titled “Advanced Usage”With Find Options
Section titled “With Find Options”public IObservable<IEnumerable<Author>> ObserveRecentAuthors(){ var options = new FindOptions<Author> { Sort = Builders<Author>.Sort.Descending(a => a.CreatedAt), Limit = 10 };
return _collection.Observe( author => author.CreatedAt > DateTime.UtcNow.AddDays(-30), options);}Subscribing to Changes
Section titled “Subscribing to Changes”public class AuthorNotificationService{ private readonly IDisposable _subscription;
public AuthorNotificationService(IMongoCollection<Author> collection) { _subscription = collection .Observe(author => author.IsActive) .Subscribe( authors => HandleAuthorsChanged(authors), error => HandleError(error), () => HandleCompleted()); }
private void HandleAuthorsChanged(IEnumerable<Author> authors) { // React to changes in active authors Console.WriteLine($"Active authors updated: {authors.Count()} authors"); }
private void HandleError(Exception error) { // Handle observation errors Console.WriteLine($"Error observing authors: {error.Message}"); }
private void HandleCompleted() { // Handle observation completion Console.WriteLine("Author observation completed"); }
public void Dispose() { _subscription?.Dispose(); }}Integration with Queries
Section titled “Integration with Queries”The observation system integrates seamlessly with the Cratis query context, supporting paging and sorting:
[Route("api/authors")]public class AuthorsController : Controller{ private readonly IMongoCollection<Author> _collection;
public AuthorsController(IMongoCollection<Author> collection) { _collection = collection; }
[HttpGet("observe")] public IObservable<IEnumerable<Author>> ObserveAuthors( [FromQuery] int page = 1, [FromQuery] int pageSize = 10, [FromQuery] string? sortBy = null, [FromQuery] string sortDirection = "asc") { // Query context will be automatically applied to the observation return _collection.Observe(author => author.IsPublished); }}Change Types Supported
Section titled “Change Types Supported”The observation system monitors the following MongoDB change stream operations:
- Insert: New documents added to the collection
- Update: Existing documents modified
- Replace: Documents replaced entirely
- Delete: Documents removed from the collection
Performance Considerations
Section titled “Performance Considerations”Filtering Early
Section titled “Filtering Early”Always apply filters to reduce the amount of data being observed:
// Good - filtered observationvar activeAuthors = collection.Observe(author => author.IsActive);
// Avoid - observing all then filtering in memoryvar allAuthors = collection.Observe() .Select(authors => authors.Where(a => a.IsActive));Resource Management
Section titled “Resource Management”Properly dispose of subscriptions to avoid memory leaks:
public class AuthorService : IDisposable{ private readonly CompositeDisposable _subscriptions = new();
public void StartObserving() { var subscription = _collection .Observe(author => author.IsActive) .Subscribe(HandleAuthorsChanged);
_subscriptions.Add(subscription); }
public void Dispose() { _subscriptions?.Dispose(); }}Batch Updates
Section titled “Batch Updates”The system automatically batches rapid successive changes to reduce notification frequency and improve performance.
Error Handling
Section titled “Error Handling”Robust error handling is essential when working with observables:
public void ObserveWithErrorHandling(){ _collection .Observe(author => author.IsActive) .Retry(3) // Retry up to 3 times on error .Catch(Observable.Empty<IEnumerable<Author>>()) // Continue with empty on final failure .Subscribe( authors => HandleAuthors(authors), error => _logger.LogError(error, "Failed to observe authors"));}Best Practices
Section titled “Best Practices”Use Specific Filters
Section titled “Use Specific Filters”Apply filters to observe only the data you need:
// Good - specific filtercollection.Observe(doc => doc.Status == "Active" && doc.Type == "Premium");
// Avoid - broad observation with post-filteringcollection.Observe().Where(docs => docs.All(d => d.Status == "Active"));Dispose Properly
Section titled “Dispose Properly”Always dispose subscriptions when they’re no longer needed:
public class ComponentWithObservation : IDisposable{ private IDisposable? _subscription;
public void StartObserving() { _subscription = collection.Observe().Subscribe(HandleData); }
public void Dispose() { _subscription?.Dispose(); }}Handle Connection Issues
Section titled “Handle Connection Issues”Implement retry logic for connection interruptions:
collection .Observe(filter) .RetryWhen(errors => errors .SelectMany(error => Observable.Timer(TimeSpan.FromSeconds(5))) .Take(5)) // Retry 5 times with 5-second intervals .Subscribe(HandleData);Integration with Dependency Injection
Section titled “Integration with Dependency Injection”Register observation services in your DI container:
public void ConfigureServices(IServiceCollection services){ services.AddSingleton<IAuthorObservationService, AuthorObservationService>();}
public interface IAuthorObservationService{ IObservable<IEnumerable<Author>> ObserveActiveAuthors(); IObservable<Author> ObserveAuthorById(AuthorId id);}
public class AuthorObservationService : IAuthorObservationService{ private readonly IMongoCollection<Author> _collection;
public AuthorObservationService(IMongoCollection<Author> collection) { _collection = collection; }
public IObservable<IEnumerable<Author>> ObserveActiveAuthors() { return _collection.Observe(author => author.IsActive); }
public IObservable<Author> ObserveAuthorById(AuthorId id) { return _collection.ObserveById<Author, AuthorId>(id); }}Troubleshooting
Section titled “Troubleshooting”Common Issues
Section titled “Common Issues”Change Stream Not Starting
Section titled “Change Stream Not Starting”- Ensure MongoDB version supports change streams (3.6+)
- Verify replica set configuration
- Check user permissions for change stream operations
Memory Leaks
Section titled “Memory Leaks”- Always dispose subscriptions when no longer needed
- Use
CompositeDisposablefor managing multiple subscriptions - Implement
IDisposablein classes that create observations
Performance Issues
Section titled “Performance Issues”- Apply filters at the database level, not in memory
- Limit the scope of observations to necessary data
- Monitor change stream performance in MongoDB logs
Debugging
Section titled “Debugging”Enable logging to troubleshoot observation issues:
services.Configure<LoggerFilterOptions>(options =>{ options.AddFilter("MongoDB.Driver.MongoCollection", LogLevel.Debug);});This will provide detailed information about change stream operations and any issues encountered during observation.