Skip to content

Observing Collections

The MongoDB extensions in Cratis Applications provide powerful reactive programming capabilities through collection observation. This feature allows you to create observables that automatically notify subscribers when documents in a MongoDB collection change, providing real-time updates to your application.

Collection observation leverages MongoDB’s Change Streams feature combined with Reactive Extensions (Rx.NET) to provide a seamless way to watch for changes in your data. The system automatically handles initial data loading, change detection, and notification of observers.

  • Real-time Updates: Automatically receive notifications when documents change
  • Filtering Support: Observe only documents matching specific criteria
  • Multiple Observation Types: Observe collections, single documents, or documents by ID
  • Query Context Awareness: Integrates with Cratis query context for paging and sorting
  • Automatic Cleanup: Proper resource management and cleanup when observations are disposed
public class AuthorService
{
private readonly IMongoCollection<Author> _collection;
public AuthorService(IMongoCollection<Author> collection)
{
_collection = collection;
}
public IObservable<IEnumerable<Author>> ObserveAllAuthors()
{
return _collection.Observe();
}
}
public IObservable<IEnumerable<Author>> ObserveActiveAuthors()
{
return _collection.Observe(author => author.IsActive);
}
public IObservable<IEnumerable<Author>> ObserveAuthorsByCategory(string category)
{
var filter = Builders<Author>.Filter.Eq(a => a.Category, category);
return _collection.Observe(filter);
}
public IObservable<Author> ObserveFeaturedAuthor()
{
return _collection.ObserveSingle(author => author.IsFeatured);
}
public IObservable<Author> ObserveAuthorById(AuthorId authorId)
{
return _collection.ObserveById<Author, AuthorId>(authorId);
}
public IObservable<IEnumerable<Author>> ObserveRecentAuthors()
{
var options = new FindOptions<Author>
{
Sort = Builders<Author>.Sort.Descending(a => a.CreatedAt),
Limit = 10
};
return _collection.Observe(
author => author.CreatedAt > DateTime.UtcNow.AddDays(-30),
options);
}
public class AuthorNotificationService
{
private readonly IDisposable _subscription;
public AuthorNotificationService(IMongoCollection<Author> collection)
{
_subscription = collection
.Observe(author => author.IsActive)
.Subscribe(
authors => HandleAuthorsChanged(authors),
error => HandleError(error),
() => HandleCompleted());
}
private void HandleAuthorsChanged(IEnumerable<Author> authors)
{
// React to changes in active authors
Console.WriteLine($"Active authors updated: {authors.Count()} authors");
}
private void HandleError(Exception error)
{
// Handle observation errors
Console.WriteLine($"Error observing authors: {error.Message}");
}
private void HandleCompleted()
{
// Handle observation completion
Console.WriteLine("Author observation completed");
}
public void Dispose()
{
_subscription?.Dispose();
}
}

The observation system integrates seamlessly with the Cratis query context, supporting paging and sorting:

[Route("api/authors")]
public class AuthorsController : Controller
{
private readonly IMongoCollection<Author> _collection;
public AuthorsController(IMongoCollection<Author> collection)
{
_collection = collection;
}
[HttpGet("observe")]
public IObservable<IEnumerable<Author>> ObserveAuthors(
[FromQuery] int page = 1,
[FromQuery] int pageSize = 10,
[FromQuery] string? sortBy = null,
[FromQuery] string sortDirection = "asc")
{
// Query context will be automatically applied to the observation
return _collection.Observe(author => author.IsPublished);
}
}

The observation system monitors the following MongoDB change stream operations:

  • Insert: New documents added to the collection
  • Update: Existing documents modified
  • Replace: Documents replaced entirely
  • Delete: Documents removed from the collection

Always apply filters to reduce the amount of data being observed:

// Good - filtered observation
var activeAuthors = collection.Observe(author => author.IsActive);
// Avoid - observing all then filtering in memory
var allAuthors = collection.Observe()
.Select(authors => authors.Where(a => a.IsActive));

Properly dispose of subscriptions to avoid memory leaks:

public class AuthorService : IDisposable
{
private readonly CompositeDisposable _subscriptions = new();
public void StartObserving()
{
var subscription = _collection
.Observe(author => author.IsActive)
.Subscribe(HandleAuthorsChanged);
_subscriptions.Add(subscription);
}
public void Dispose()
{
_subscriptions?.Dispose();
}
}

The system automatically batches rapid successive changes to reduce notification frequency and improve performance.

Robust error handling is essential when working with observables:

public void ObserveWithErrorHandling()
{
_collection
.Observe(author => author.IsActive)
.Retry(3) // Retry up to 3 times on error
.Catch(Observable.Empty<IEnumerable<Author>>()) // Continue with empty on final failure
.Subscribe(
authors => HandleAuthors(authors),
error => _logger.LogError(error, "Failed to observe authors"));
}

Apply filters to observe only the data you need:

// Good - specific filter
collection.Observe(doc => doc.Status == "Active" && doc.Type == "Premium");
// Avoid - broad observation with post-filtering
collection.Observe().Where(docs => docs.All(d => d.Status == "Active"));

Always dispose subscriptions when they’re no longer needed:

public class ComponentWithObservation : IDisposable
{
private IDisposable? _subscription;
public void StartObserving()
{
_subscription = collection.Observe().Subscribe(HandleData);
}
public void Dispose()
{
_subscription?.Dispose();
}
}

Implement retry logic for connection interruptions:

collection
.Observe(filter)
.RetryWhen(errors => errors
.SelectMany(error => Observable.Timer(TimeSpan.FromSeconds(5)))
.Take(5)) // Retry 5 times with 5-second intervals
.Subscribe(HandleData);

Register observation services in your DI container:

public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IAuthorObservationService, AuthorObservationService>();
}
public interface IAuthorObservationService
{
IObservable<IEnumerable<Author>> ObserveActiveAuthors();
IObservable<Author> ObserveAuthorById(AuthorId id);
}
public class AuthorObservationService : IAuthorObservationService
{
private readonly IMongoCollection<Author> _collection;
public AuthorObservationService(IMongoCollection<Author> collection)
{
_collection = collection;
}
public IObservable<IEnumerable<Author>> ObserveActiveAuthors()
{
return _collection.Observe(author => author.IsActive);
}
public IObservable<Author> ObserveAuthorById(AuthorId id)
{
return _collection.ObserveById<Author, AuthorId>(id);
}
}
  • Ensure MongoDB version supports change streams (3.6+)
  • Verify replica set configuration
  • Check user permissions for change stream operations
  • Always dispose subscriptions when no longer needed
  • Use CompositeDisposable for managing multiple subscriptions
  • Implement IDisposable in classes that create observations
  • Apply filters at the database level, not in memory
  • Limit the scope of observations to necessary data
  • Monitor change stream performance in MongoDB logs

Enable logging to troubleshoot observation issues:

services.Configure<LoggerFilterOptions>(options =>
{
options.AddFilter("MongoDB.Driver.MongoCollection", LogLevel.Debug);
});

This will provide detailed information about change stream operations and any issues encountered during observation.