Observable Queries
Observable queries provide real-time data streaming using WebSockets, enabling reactive user experiences where data changes are pushed to clients as they occur. You achieve this by returning ISubject<T> from your controller actions.
The ISubject<T> return type automatically establishes a WebSocket connection between the server and client, enabling real-time data updates. This integrates seamlessly with the ObservableQuery construct in the frontend through the proxy generator, creating strongly-typed reactive data flows.
Basic Observable Query
Section titled “Basic Observable Query”The key to an observable query is to return the ISubject<T> produced by the MongoDB
Observe() extension method. Observe() watches the collection and pushes a fresh
snapshot every time the data changes — Arc establishes and manages the WebSocket
connection for you. The examples below use _collection, an injected
IMongoCollection<DebitAccount> field on the controller.
[HttpGet("observable")]public ISubject<IEnumerable<DebitAccount>> AllAccountsObservable(){ return _collection.Observe();}Observable with Arguments
Section titled “Observable with Arguments”Observable queries can accept arguments just like regular queries:
[HttpGet("owner/{ownerId}/observable")]public ISubject<IEnumerable<DebitAccount>> GetAccountsByOwnerObservable(CustomerId ownerId){ return _collection.Observe(account => account.Owner == ownerId);}
[HttpGet("filtered-observable")]public ISubject<IEnumerable<DebitAccount>> GetFilteredAccountsObservable( [FromQuery] decimal minBalance = 0){ return _collection.Observe(account => account.Balance >= minBalance);}Single Object Observable
Section titled “Single Object Observable”For observing changes to a single object:
[HttpGet("{id}/observable")]public ISubject<DebitAccount> GetAccountObservable(AccountId id){ return _collection.ObserveSingle(account => account.Id == id);}Custom Observable Logic
Section titled “Custom Observable Logic”For computed or derived data, build on top of the collection’s observable. Observe()
returns an IObservable<IEnumerable<T>> that emits a new snapshot whenever the data
changes, so you can use System.Reactive operators such as Select to project each
snapshot into a computed shape:
using System.Reactive.Linq;using System.Reactive.Subjects;
[HttpGet("summary")]public ISubject<AccountSummary> GetAccountSummaryObservable(){ var summary = new ReplaySubject<AccountSummary>(1);
_collection.Observe() .Select(accounts => new AccountSummary(accounts.Count(), accounts.Sum(a => a.Balance))) .Subscribe(summary);
return summary;}Multiple Data Source Observables
Section titled “Multiple Data Source Observables”Observe changes across multiple data sources:
using System.Reactive.Linq;using System.Reactive.Subjects;
public record CombinedData(IEnumerable<DebitAccount> Accounts, IEnumerable<Customer> Customers);
[HttpGet("combined-observable")]public ISubject<CombinedData> GetCombinedDataObservable(){ var combined = new ReplaySubject<CombinedData>(1);
_accountCollection.Observe() .CombineLatest(_customerCollection.Observe(), (accounts, customers) => new CombinedData(accounts, customers)) .Subscribe(combined);
return combined;}Observable with Computed Values
Section titled “Observable with Computed Values”Create observables that compute derived values:
using System.Reactive.Linq;using System.Reactive.Subjects;
[HttpGet("computed-metrics")]public ISubject<AccountMetrics> GetAccountMetricsObservable(){ var metrics = new ReplaySubject<AccountMetrics>(1);
_collection.Observe() .Select(accounts => new AccountMetrics( TotalAccounts: accounts.Count(), TotalBalance: accounts.Sum(a => a.Balance), AverageBalance: accounts.Any() ? accounts.Average(a => a.Balance) : 0, ActiveAccounts: accounts.Count(a => a.Balance > 0), HighValueAccounts: accounts.Count(a => a.Balance > 100000))) .Subscribe(metrics);
return metrics;}Filtered Observables with Dynamic Criteria
Section titled “Filtered Observables with Dynamic Criteria”Allow clients to specify filter criteria for observables:
using System.Reactive.Linq;using System.Reactive.Subjects;
public record ObservableFilter( decimal? MinBalance, decimal? MaxBalance, string? NamePattern, CustomerId? OwnerId);
[HttpPost("filtered-observable")]public ISubject<IEnumerable<DebitAccount>> GetFilteredObservable([FromBody] ObservableFilter filter){ var predicate = BuildFilterPredicate(filter); var filtered = new ReplaySubject<IEnumerable<DebitAccount>>(1);
_collection.Observe() .Select(accounts => accounts.Where(predicate).ToList().AsEnumerable()) .Subscribe(filtered);
return filtered;}
Func<DebitAccount, bool> BuildFilterPredicate(ObservableFilter filter){ return account => (!filter.MinBalance.HasValue || account.Balance >= filter.MinBalance.Value) && (!filter.MaxBalance.HasValue || account.Balance <= filter.MaxBalance.Value) && (string.IsNullOrEmpty(filter.NamePattern) || account.Name.Contains(filter.NamePattern, StringComparison.OrdinalIgnoreCase)) && (!filter.OwnerId.HasValue || account.Owner == filter.OwnerId.Value);}Throttled Observables
Section titled “Throttled Observables”For high-frequency changes, use the System.Reactive Sample operator to emit at most one
update per time window, preventing a flood of changes from overwhelming clients:
using System.Reactive.Linq;using System.Reactive.Subjects;
[HttpGet("throttled-observable")]public ISubject<IEnumerable<DebitAccount>> GetThrottledObservable( [FromQuery] int throttleMs = 1000){ var throttled = new ReplaySubject<IEnumerable<DebitAccount>>(1);
_collection.Observe() .Sample(TimeSpan.FromMilliseconds(throttleMs)) .Subscribe(throttled);
return throttled;}Error Handling in Observables
Section titled “Error Handling in Observables”Errors raised while observing the collection propagate through the observable’s error
channel automatically. Use the System.Reactive Do operator to log them as they flow
through, without altering the stream:
using System.Reactive.Linq;using System.Reactive.Subjects;
[HttpGet("robust-observable")]public ISubject<IEnumerable<DebitAccount>> GetRobustObservable(){ var observable = new ReplaySubject<IEnumerable<DebitAccount>>(1);
_collection.Observe() .Do( onNext: _ => { }, onError: ex => _logger.LogError(ex, "Error observing accounts")) .Subscribe(observable);
return observable;}Authentication and Authorization
Section titled “Authentication and Authorization”Observable queries support the same authentication and authorization as regular queries:
[Authorize][HttpGet("secure-observable")]public ISubject<IEnumerable<DebitAccount>> GetSecureObservable(){ // Only authenticated users can subscribe return _collection.Observe();}
[Authorize(Roles = "Admin")][HttpGet("admin-observable")]public ISubject<IEnumerable<DebitAccount>> GetAdminObservable(){ // Only admin users can subscribe return _collection.Observe();}Best Practices for Observable Queries
Section titled “Best Practices for Observable Queries”- Prefer the
Observe()/ObserveSingle()extension methods — they handle change monitoring, initial data, and cleanup for you - Project with System.Reactive operators (
Select,CombineLatest,Sample) when you need computed, combined, or throttled streams - Use appropriate filters to minimize unnecessary data transmission
- Consider throttling with
Samplefor high-frequency changes to prevent overwhelming clients - Let errors propagate through the observable’s error channel; use
Doto observe them for logging - Use authentication to control who can subscribe to observable endpoints
- Monitor performance and consider the impact of many concurrent subscriptions
Connection Management
Section titled “Connection Management”Arc automatically handles WebSocket connections for observable queries:
- Connection establishment - Automatic WebSocket upgrade for observable endpoints
- Message serialization - Automatic JSON serialization of observable data
- Connection cleanup - Proper disposal of resources when clients disconnect
- Reconnection handling - Clients can reconnect and resume subscriptions
Waiting for the First HTTP Result
Section titled “Waiting for the First HTTP Result”The regular HTTP GET endpoint for a controller-based observable query returns the current snapshot as JSON. If the observable has not produced a value yet, add waitForFirstResult=true to keep the HTTP request open until the first item arrives.
Arc applies a timeout while waiting. By default the timeout is 30 seconds. You can override it with waitForFirstResultTimeout, expressed in seconds.
curl "https://localhost:5001/api/observable-controller-queries/observe/delayed-single?waitForFirstResult=true"curl "https://localhost:5001/api/observable-controller-queries/observe/delayed-single?waitForFirstResult=true&waitForFirstResultTimeout=10"This makes it easy to debug observable controller actions with cURL without switching to WebSockets or SSE.
See Use Observable Queries with cURL for snapshot, SSE, and long-polling workflows.
Frontend Integration
Section titled “Frontend Integration”Observable queries integrate seamlessly with frontend frameworks through the proxy generator and the ObservableQuery construct:
accountsObservable.subscribe(accounts => { // Handle real-time account updates updateUI(accounts);});The ISubject<T> return type automatically establishes and manages WebSocket connections, providing:
Important: The
Observe()andObserveSingle()extension methods manage their own subscriptions and clean up automatically when a client disconnects, so you do not need to write any teardown code.
- Automatic connection management - WebSocket connections are established and maintained automatically
- Strongly-typed data flow - Full TypeScript support through the proxy generator
- Reactive integration - Seamless integration with React hooks like
useObservableQuery() - Reconnection handling - Automatic reconnection and state recovery on connection loss
Note: The proxy generator automatically creates TypeScript types for your observable queries, making them strongly typed on the frontend as well.