Skip to content

Observable Queries

Observable queries in model-bound scenarios provide the same real-time data streaming capabilities as controller-based queries, but implemented as static methods directly on your read model records. You achieve this by returning ISubject<T> from static methods on your [ReadModel] decorated record.

The ISubject<T> return type automatically establishes a WebSocket connection between the server and client, enabling real-time data updates. This integrates seamlessly with the ObservableQuery construct in the frontend through the proxy generator, creating strongly-typed reactive data flows.

Define an observable query as a static method on your read model and return the ISubject<T> produced by the MongoDB Observe() extension method. Observe() watches the collection and pushes a fresh snapshot every time the underlying data changes — Arc handles the WebSocket connection and the change monitoring for you:

[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<IEnumerable<DebitAccount>> GetAllAccountsObservable(IMongoCollection<DebitAccount> collection)
{
return collection.Observe();
}
public static IEnumerable<DebitAccount> GetAllAccounts(IMongoCollection<DebitAccount> collection)
{
return collection.Find(_ => true).ToList();
}
}

Observable queries can accept arguments just like regular queries. Pass a filter expression to Observe() and the observable only streams the matching documents:

[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<IEnumerable<DebitAccount>> GetAccountsByOwnerObservable(
CustomerId ownerId,
IMongoCollection<DebitAccount> collection)
{
return collection.Observe(account => account.Owner == ownerId);
}
public static ISubject<IEnumerable<DebitAccount>> GetFilteredAccountsObservable(
IMongoCollection<DebitAccount> collection,
decimal minBalance = 0)
{
return collection.Observe(account => account.Balance >= minBalance);
}
}

For observing changes to a single object, return ISubject<T> (not a collection) and use ObserveSingle(), which streams the latest version of the one matching document:

[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<DebitAccount> GetAccountObservable(
AccountId id,
IMongoCollection<DebitAccount> collection)
{
return collection.ObserveSingle(account => account.Id == id);
}
}

For computed or derived data, build on top of the collection’s observable. Observe() returns an IObservable<IEnumerable<T>> that emits a new snapshot whenever the data changes, so you can use System.Reactive operators such as Select to project each snapshot into a computed shape. The result is itself an ISubject<T> via ReplaySubject<T>, so subscribers always receive the most recent computed value:

using System.Reactive.Linq;
using System.Reactive.Subjects;
public record AccountSummary(int Count, decimal TotalBalance);
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<AccountSummary> GetAccountSummaryObservable(IMongoCollection<DebitAccount> collection)
{
var summary = new ReplaySubject<AccountSummary>(1);
collection.Observe()
.Select(accounts => new AccountSummary(accounts.Count(), accounts.Sum(a => a.Balance)))
.Subscribe(summary);
return summary;
}
}

Observe changes across multiple data sources by injecting multiple collections and combining their observables with CombineLatest. The combined observable re-emits whenever either source changes:

using System.Reactive.Linq;
using System.Reactive.Subjects;
public record CombinedData(IEnumerable<DebitAccount> Accounts, IEnumerable<Customer> Customers);
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<CombinedData> GetCombinedDataObservable(
IMongoCollection<DebitAccount> accountCollection,
IMongoCollection<Customer> customerCollection)
{
var combined = new ReplaySubject<CombinedData>(1);
accountCollection.Observe()
.CombineLatest(customerCollection.Observe(),
(accounts, customers) => new CombinedData(accounts, customers))
.Subscribe(combined);
return combined;
}
}

Create observables that compute derived values by projecting each snapshot with Select. Every change to the underlying collection recomputes and re-emits the metrics:

using System.Reactive.Linq;
using System.Reactive.Subjects;
public record AccountMetrics(
int TotalAccounts,
decimal TotalBalance,
decimal AverageBalance,
int ActiveAccounts,
int HighValueAccounts);
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<AccountMetrics> GetAccountMetricsObservable(IMongoCollection<DebitAccount> collection)
{
var metrics = new ReplaySubject<AccountMetrics>(1);
collection.Observe()
.Select(accounts => new AccountMetrics(
TotalAccounts: accounts.Count(),
TotalBalance: accounts.Sum(a => a.Balance),
AverageBalance: accounts.Any() ? accounts.Average(a => a.Balance) : 0,
ActiveAccounts: accounts.Count(a => a.Balance > 0),
HighValueAccounts: accounts.Count(a => a.Balance > 100000)))
.Subscribe(metrics);
return metrics;
}
}

Filtered Observables with Dynamic Criteria

Section titled “Filtered Observables with Dynamic Criteria”

Allow clients to specify filter criteria for observables. Observe the whole collection and apply the dynamic predicate to each emitted snapshot with Select, so the filtered results stay current as the data changes:

using System.Reactive.Linq;
using System.Reactive.Subjects;
public record ObservableFilter(
decimal? MinBalance,
decimal? MaxBalance,
string? NamePattern,
CustomerId? OwnerId);
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<IEnumerable<DebitAccount>> GetFilteredObservable(
ObservableFilter filter,
IMongoCollection<DebitAccount> collection)
{
var predicate = BuildFilterPredicate(filter);
var filtered = new ReplaySubject<IEnumerable<DebitAccount>>(1);
collection.Observe()
.Select(accounts => accounts.Where(predicate).ToList().AsEnumerable())
.Subscribe(filtered);
return filtered;
}
static Func<DebitAccount, bool> BuildFilterPredicate(ObservableFilter filter)
{
return account =>
(!filter.MinBalance.HasValue || account.Balance >= filter.MinBalance.Value) &&
(!filter.MaxBalance.HasValue || account.Balance <= filter.MaxBalance.Value) &&
(string.IsNullOrEmpty(filter.NamePattern) || account.Name.Contains(filter.NamePattern, StringComparison.OrdinalIgnoreCase)) &&
(!filter.OwnerId.HasValue || account.Owner == filter.OwnerId.Value);
}
}

For high-frequency changes, use the System.Reactive Sample operator to emit at most one update per time window, preventing a flood of changes from overwhelming clients:

using System.Reactive.Linq;
using System.Reactive.Subjects;
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<IEnumerable<DebitAccount>> GetThrottledObservable(
IMongoCollection<DebitAccount> collection,
int throttleMs = 1000)
{
var throttled = new ReplaySubject<IEnumerable<DebitAccount>>(1);
collection.Observe()
.Sample(TimeSpan.FromMilliseconds(throttleMs))
.Subscribe(throttled);
return throttled;
}
}

Errors raised while observing the collection propagate through the observable’s error channel automatically. Use the System.Reactive Do operator to log them as they flow through, without altering the stream:

using System.Reactive.Linq;
using System.Reactive.Subjects;
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<IEnumerable<DebitAccount>> GetRobustObservable(
IMongoCollection<DebitAccount> collection,
ILogger<DebitAccount> logger)
{
var observable = new ReplaySubject<IEnumerable<DebitAccount>>(1);
collection.Observe()
.Do(
onNext: _ => { },
onError: ex => logger.LogError(ex, "Error observing accounts"))
.Subscribe(observable);
return observable;
}
}

Observable queries support the same authentication and authorization as regular queries:

[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
[Authorize]
public static ISubject<IEnumerable<DebitAccount>> GetSecureObservable(IMongoCollection<DebitAccount> collection)
{
// Only authenticated users can subscribe
return collection.Observe();
}
[Authorize(Roles = "Admin")]
public static ISubject<IEnumerable<DebitAccount>> GetAdminObservable(IMongoCollection<DebitAccount> collection)
{
// Only admin users can subscribe
return collection.Observe();
}
}

Model-bound observable queries support the same dependency injection patterns as regular model-bound queries:

using System.Reactive.Linq;
using System.Reactive.Subjects;
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<IEnumerable<DebitAccount>> GetAccountsWithBusinessLogic(
IMongoCollection<DebitAccount> collection,
IAccountValidator validator)
{
var valid = new ReplaySubject<IEnumerable<DebitAccount>>(1);
collection.Observe()
.Select(accounts => accounts.Where(validator.IsValid).ToList().AsEnumerable())
.Subscribe(valid);
return valid;
}
}

Best Practices for Model-Bound Observable Queries

Section titled “Best Practices for Model-Bound Observable Queries”
  1. Prefer the Observe() / ObserveSingle() extension methods — they handle change monitoring, initial data, and cleanup for you
  2. Project with System.Reactive operators (Select, CombineLatest, Sample) when you need computed, combined, or throttled streams
  3. Use appropriate filters to minimize unnecessary data transmission
  4. Consider throttling with Sample for high-frequency changes to prevent overwhelming clients
  5. Let errors propagate through the observable’s error channel; use Do to observe them for logging
  6. Use authentication to control who can subscribe to observable endpoints
  7. Monitor performance and consider the impact of many concurrent subscriptions
  8. Keep observable methods static and follow the same patterns as regular model-bound queries

Observable queries integrate seamlessly with frontend frameworks through the proxy generator and the ObservableQuery construct:

accountsObservable.subscribe(accounts => {
// Handle real-time account updates
updateUI(accounts);
});

The ISubject<T> return type automatically establishes and manages WebSocket connections, providing:

Important: The Observe() and ObserveSingle() extension methods manage their own subscriptions and clean up automatically when a client disconnects, so you do not need to write any teardown code.

  • Automatic connection management - WebSocket connections are established and maintained automatically
  • Strongly-typed data flow - Full TypeScript support through the proxy generator
  • Reactive integration - Seamless integration with React hooks like useObservableQuery()
  • Reconnection handling - Automatic reconnection and state recovery on connection loss

Note: The proxy generator automatically creates TypeScript types for your observable queries, making them strongly typed on the frontend as well.

The generated HTTP GET endpoint for an observable query returns the current snapshot as JSON. If the observable does not have a current value yet, add waitForFirstResult=true to keep the HTTP request open until the first item is produced.

Arc applies a timeout while waiting. By default the timeout is 30 seconds. You can override it with waitForFirstResultTimeout, expressed in seconds.

Terminal window
curl "https://localhost:5001/api/debit-account/observe-single?waitForFirstResult=true"
Terminal window
curl "https://localhost:5001/api/debit-account/observe-single?waitForFirstResult=true&waitForFirstResultTimeout=10"

This is useful when debugging observable queries with cURL or any other plain HTTP client and you want the request to block until the first result is available.

See Use Observable Queries with cURL for snapshot, SSE, and long-polling workflows.

Arc automatically handles WebSocket connections for observable queries:

  • Connection establishment - Automatic WebSocket upgrade for observable endpoints
  • Message serialization - Automatic JSON serialization of observable data
  • Connection cleanup - Proper disposal of resources when clients disconnect
  • Reconnection handling - Clients can reconnect and resume subscriptions

The same connection management applies whether using controller-based or model-bound approaches.