Skip to content

Observable Queries

Observable queries provide real-time data streaming using WebSockets, enabling reactive user experiences where data changes are pushed to clients as they occur. You achieve this by returning ISubject<T> from your controller actions.

The ISubject<T> return type automatically establishes a WebSocket connection between the server and client, enabling real-time data updates. This integrates seamlessly with the ObservableQuery construct in the frontend through the proxy generator, creating strongly-typed reactive data flows.

The key to an observable query is to return the ISubject<T> produced by the MongoDB Observe() extension method. Observe() watches the collection and pushes a fresh snapshot every time the data changes — Arc establishes and manages the WebSocket connection for you. The examples below use _collection, an injected IMongoCollection<DebitAccount> field on the controller.

[HttpGet("observable")]
public ISubject<IEnumerable<DebitAccount>> AllAccountsObservable()
{
return _collection.Observe();
}

Observable queries can accept arguments just like regular queries:

[HttpGet("owner/{ownerId}/observable")]
public ISubject<IEnumerable<DebitAccount>> GetAccountsByOwnerObservable(CustomerId ownerId)
{
return _collection.Observe(account => account.Owner == ownerId);
}
[HttpGet("filtered-observable")]
public ISubject<IEnumerable<DebitAccount>> GetFilteredAccountsObservable(
[FromQuery] decimal minBalance = 0)
{
return _collection.Observe(account => account.Balance >= minBalance);
}

For observing changes to a single object:

[HttpGet("{id}/observable")]
public ISubject<DebitAccount> GetAccountObservable(AccountId id)
{
return _collection.ObserveSingle(account => account.Id == id);
}

For computed or derived data, build on top of the collection’s observable. Observe() returns an IObservable<IEnumerable<T>> that emits a new snapshot whenever the data changes, so you can use System.Reactive operators such as Select to project each snapshot into a computed shape:

using System.Reactive.Linq;
using System.Reactive.Subjects;
[HttpGet("summary")]
public ISubject<AccountSummary> GetAccountSummaryObservable()
{
var summary = new ReplaySubject<AccountSummary>(1);
_collection.Observe()
.Select(accounts => new AccountSummary(accounts.Count(), accounts.Sum(a => a.Balance)))
.Subscribe(summary);
return summary;
}

Observe changes across multiple data sources:

using System.Reactive.Linq;
using System.Reactive.Subjects;
public record CombinedData(IEnumerable<DebitAccount> Accounts, IEnumerable<Customer> Customers);
[HttpGet("combined-observable")]
public ISubject<CombinedData> GetCombinedDataObservable()
{
var combined = new ReplaySubject<CombinedData>(1);
_accountCollection.Observe()
.CombineLatest(_customerCollection.Observe(),
(accounts, customers) => new CombinedData(accounts, customers))
.Subscribe(combined);
return combined;
}

Create observables that compute derived values:

using System.Reactive.Linq;
using System.Reactive.Subjects;
[HttpGet("computed-metrics")]
public ISubject<AccountMetrics> GetAccountMetricsObservable()
{
var metrics = new ReplaySubject<AccountMetrics>(1);
_collection.Observe()
.Select(accounts => new AccountMetrics(
TotalAccounts: accounts.Count(),
TotalBalance: accounts.Sum(a => a.Balance),
AverageBalance: accounts.Any() ? accounts.Average(a => a.Balance) : 0,
ActiveAccounts: accounts.Count(a => a.Balance > 0),
HighValueAccounts: accounts.Count(a => a.Balance > 100000)))
.Subscribe(metrics);
return metrics;
}

Filtered Observables with Dynamic Criteria

Section titled “Filtered Observables with Dynamic Criteria”

Allow clients to specify filter criteria for observables:

using System.Reactive.Linq;
using System.Reactive.Subjects;
public record ObservableFilter(
decimal? MinBalance,
decimal? MaxBalance,
string? NamePattern,
CustomerId? OwnerId);
[HttpPost("filtered-observable")]
public ISubject<IEnumerable<DebitAccount>> GetFilteredObservable([FromBody] ObservableFilter filter)
{
var predicate = BuildFilterPredicate(filter);
var filtered = new ReplaySubject<IEnumerable<DebitAccount>>(1);
_collection.Observe()
.Select(accounts => accounts.Where(predicate).ToList().AsEnumerable())
.Subscribe(filtered);
return filtered;
}
Func<DebitAccount, bool> BuildFilterPredicate(ObservableFilter filter)
{
return account =>
(!filter.MinBalance.HasValue || account.Balance >= filter.MinBalance.Value) &&
(!filter.MaxBalance.HasValue || account.Balance <= filter.MaxBalance.Value) &&
(string.IsNullOrEmpty(filter.NamePattern) || account.Name.Contains(filter.NamePattern, StringComparison.OrdinalIgnoreCase)) &&
(!filter.OwnerId.HasValue || account.Owner == filter.OwnerId.Value);
}

For high-frequency changes, use the System.Reactive Sample operator to emit at most one update per time window, preventing a flood of changes from overwhelming clients:

using System.Reactive.Linq;
using System.Reactive.Subjects;
[HttpGet("throttled-observable")]
public ISubject<IEnumerable<DebitAccount>> GetThrottledObservable(
[FromQuery] int throttleMs = 1000)
{
var throttled = new ReplaySubject<IEnumerable<DebitAccount>>(1);
_collection.Observe()
.Sample(TimeSpan.FromMilliseconds(throttleMs))
.Subscribe(throttled);
return throttled;
}

Errors raised while observing the collection propagate through the observable’s error channel automatically. Use the System.Reactive Do operator to log them as they flow through, without altering the stream:

using System.Reactive.Linq;
using System.Reactive.Subjects;
[HttpGet("robust-observable")]
public ISubject<IEnumerable<DebitAccount>> GetRobustObservable()
{
var observable = new ReplaySubject<IEnumerable<DebitAccount>>(1);
_collection.Observe()
.Do(
onNext: _ => { },
onError: ex => _logger.LogError(ex, "Error observing accounts"))
.Subscribe(observable);
return observable;
}

Observable queries support the same authentication and authorization as regular queries:

[Authorize]
[HttpGet("secure-observable")]
public ISubject<IEnumerable<DebitAccount>> GetSecureObservable()
{
// Only authenticated users can subscribe
return _collection.Observe();
}
[Authorize(Roles = "Admin")]
[HttpGet("admin-observable")]
public ISubject<IEnumerable<DebitAccount>> GetAdminObservable()
{
// Only admin users can subscribe
return _collection.Observe();
}
  1. Prefer the Observe() / ObserveSingle() extension methods — they handle change monitoring, initial data, and cleanup for you
  2. Project with System.Reactive operators (Select, CombineLatest, Sample) when you need computed, combined, or throttled streams
  3. Use appropriate filters to minimize unnecessary data transmission
  4. Consider throttling with Sample for high-frequency changes to prevent overwhelming clients
  5. Let errors propagate through the observable’s error channel; use Do to observe them for logging
  6. Use authentication to control who can subscribe to observable endpoints
  7. Monitor performance and consider the impact of many concurrent subscriptions

Arc automatically handles WebSocket connections for observable queries:

  • Connection establishment - Automatic WebSocket upgrade for observable endpoints
  • Message serialization - Automatic JSON serialization of observable data
  • Connection cleanup - Proper disposal of resources when clients disconnect
  • Reconnection handling - Clients can reconnect and resume subscriptions

The regular HTTP GET endpoint for a controller-based observable query returns the current snapshot as JSON. If the observable has not produced a value yet, add waitForFirstResult=true to keep the HTTP request open until the first item arrives.

Arc applies a timeout while waiting. By default the timeout is 30 seconds. You can override it with waitForFirstResultTimeout, expressed in seconds.

Terminal window
curl "https://localhost:5001/api/observable-controller-queries/observe/delayed-single?waitForFirstResult=true"
Terminal window
curl "https://localhost:5001/api/observable-controller-queries/observe/delayed-single?waitForFirstResult=true&waitForFirstResultTimeout=10"

This makes it easy to debug observable controller actions with cURL without switching to WebSockets or SSE.

See Use Observable Queries with cURL for snapshot, SSE, and long-polling workflows.

Observable queries integrate seamlessly with frontend frameworks through the proxy generator and the ObservableQuery construct:

accountsObservable.subscribe(accounts => {
// Handle real-time account updates
updateUI(accounts);
});

The ISubject<T> return type automatically establishes and manages WebSocket connections, providing:

Important: The Observe() and ObserveSingle() extension methods manage their own subscriptions and clean up automatically when a client disconnects, so you do not need to write any teardown code.

  • Automatic connection management - WebSocket connections are established and maintained automatically
  • Strongly-typed data flow - Full TypeScript support through the proxy generator
  • Reactive integration - Seamless integration with React hooks like useObservableQuery()
  • Reconnection handling - Automatic reconnection and state recovery on connection loss

Note: The proxy generator automatically creates TypeScript types for your observable queries, making them strongly typed on the frontend as well.