Observable Queries
Observable queries in model-bound scenarios provide the same real-time data streaming capabilities as controller-based queries, but implemented as static methods directly on your read model records. You achieve this by returning ISubject<T> from static methods on your [ReadModel] decorated record.
The ISubject<T> return type automatically establishes a WebSocket connection between the server and client, enabling real-time data updates. This integrates seamlessly with the ObservableQuery construct in the frontend through the proxy generator, creating strongly-typed reactive data flows.
Basic Observable Query
Define an observable query as a static method on your read model:
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<IEnumerable<DebitAccount>> GetAllAccountsObservable(IMongoCollection<DebitAccount> collection)
{
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
// Send initial data
var accounts = GetAllAccounts(collection);
observable.OnNext(accounts);
// Set up real-time updates (implementation varies by data source)
SetupDataChangeNotifications(observable, collection);
return observable;
}
public static IEnumerable<DebitAccount> GetAllAccounts(IMongoCollection<DebitAccount> collection)
{
return collection.Find(_ => true).ToList();
}
}
Observable with Arguments
Observable queries can accept arguments just like regular queries:
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<IEnumerable<DebitAccount>> GetAccountsByOwnerObservable(
CustomerId ownerId,
IMongoCollection<DebitAccount> collection)
{
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
// Send initial filtered data
var accounts = GetAccountsByOwner(ownerId, collection);
observable.OnNext(accounts);
// Set up filtered change notifications
SetupFilteredDataChangeNotifications(observable, collection, ownerId);
return observable;
}
public static ISubject<IEnumerable<DebitAccount>> GetFilteredAccountsObservable(
IMongoCollection<DebitAccount> collection,
decimal? minBalance = null)
{
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
// Send initial filtered data
var accounts = minBalance.HasValue
? GetAccountsAboveBalance(minBalance.Value, collection)
: GetAllAccounts(collection);
observable.OnNext(accounts);
// Set up change notifications with filter
SetupFilteredDataChangeNotifications(observable, collection, minBalance);
return observable;
}
}
Single Object Observable
For observing changes to a single object:
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<DebitAccount> GetAccountObservable(
AccountId id,
IMongoCollection<DebitAccount> collection)
{
var observable = new ClientObservable<DebitAccount>();
// Send initial object
var account = GetAccountById(id, collection);
if (account is not null)
{
observable.OnNext(account);
}
// Set up change notifications for this specific object
SetupSingleObjectChangeNotifications(observable, collection, id);
return observable;
}
}
Custom Observable Logic
For more complex scenarios, you can implement custom observable logic using ClientObservable<T>:
public record AccountSummary(int Count, decimal TotalBalance);
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<AccountSummary> GetAccountSummaryObservable(IMongoCollection<DebitAccount> collection)
{
var observable = new ClientObservable<AccountSummary>();
var calculateSummary = () =>
{
var accounts = GetAllAccounts(collection);
return new AccountSummary(accounts.Count(), accounts.Sum(a => a.Balance));
};
// Send initial summary
observable.OnNext(calculateSummary());
// Set up change notifications for computed data
// Implementation depends on your data source and change notification mechanism
var changeToken = SetupDataChangeNotifications(collection, () =>
{
observable.OnNext(calculateSummary());
});
// Clean up when client disconnects
observable.ClientDisconnected = () => changeToken?.Dispose();
return observable;
}
}
Multiple Data Source Observables
Observe changes across multiple data sources by injecting multiple collections:
public record CombinedData(IEnumerable<DebitAccount> Accounts, IEnumerable<Customer> Customers);
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<CombinedData> GetCombinedDataObservable(
IMongoCollection<DebitAccount> accountCollection,
IMongoCollection<Customer> customerCollection)
{
var observable = new ClientObservable<CombinedData>();
var sendUpdate = () =>
{
var accounts = GetAllAccounts(accountCollection);
var customers = GetAllCustomers(customerCollection);
observable.OnNext(new CombinedData(accounts, customers));
};
// Send initial data
sendUpdate();
// Set up change notifications for multiple data sources
var accountChangeToken = SetupAccountChangeNotifications(accountCollection, sendUpdate);
var customerChangeToken = SetupCustomerChangeNotifications(customerCollection, sendUpdate);
observable.ClientDisconnected = () =>
{
accountChangeToken?.Dispose();
customerChangeToken?.Dispose();
};
return observable;
}
}
Observable with Computed Values
Create observables that compute derived values:
public record AccountMetrics(
int TotalAccounts,
decimal TotalBalance,
decimal AverageBalance,
int ActiveAccounts,
int HighValueAccounts);
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<AccountMetrics> GetAccountMetricsObservable(IMongoCollection<DebitAccount> collection)
{
var observable = new ClientObservable<AccountMetrics>();
var computeMetrics = () =>
{
var accounts = GetAllAccounts(collection);
return new AccountMetrics(
TotalAccounts: accounts.Count(),
TotalBalance: accounts.Sum(a => a.Balance),
AverageBalance: accounts.Any() ? accounts.Average(a => a.Balance) : 0,
ActiveAccounts: accounts.Count(a => a.Balance > 0),
HighValueAccounts: accounts.Count(a => a.Balance > 100000)
);
};
// Send initial metrics
observable.OnNext(computeMetrics());
// Set up change notifications and recompute when data changes
var changeToken = SetupDataChangeNotifications(collection, () =>
{
observable.OnNext(computeMetrics());
});
observable.ClientDisconnected = () => changeToken?.Dispose();
return observable;
}
}
Filtered Observables with Dynamic Criteria
Allow clients to specify filter criteria for observables:
public record ObservableFilter(
decimal? MinBalance,
decimal? MaxBalance,
string? NamePattern,
CustomerId? OwnerId);
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<IEnumerable<DebitAccount>> GetFilteredObservable(
ObservableFilter filter,
IMongoCollection<DebitAccount> collection)
{
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
// Build filter predicate
var predicate = BuildFilterPredicate(filter);
var sendFilteredData = () =>
{
var accounts = GetAllAccounts(collection).Where(predicate);
observable.OnNext(accounts);
};
// Send initial filtered data
sendFilteredData();
// Set up change notifications with the same filter
var changeToken = SetupFilteredDataChangeNotifications(collection, sendFilteredData, filter);
observable.ClientDisconnected = () => changeToken?.Dispose();
return observable;
}
private static Func<DebitAccount, bool> BuildFilterPredicate(ObservableFilter filter)
{
return account =>
(!filter.MinBalance.HasValue || account.Balance >= filter.MinBalance.Value) &&
(!filter.MaxBalance.HasValue || account.Balance <= filter.MaxBalance.Value) &&
(string.IsNullOrEmpty(filter.NamePattern) || account.Name.Contains(filter.NamePattern, StringComparison.OrdinalIgnoreCase)) &&
(!filter.OwnerId.HasValue || account.Owner == filter.OwnerId.Value);
}
}
Throttled Observables
For high-frequency changes, implement throttling to prevent overwhelming clients:
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<IEnumerable<DebitAccount>> GetThrottledObservable(
IMongoCollection<DebitAccount> collection,
int throttleMs = 1000)
{
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
var throttleTimer = new System.Timers.Timer(throttleMs);
var pendingUpdate = false;
var sendUpdate = () =>
{
var accounts = GetAllAccounts(collection);
observable.OnNext(accounts);
pendingUpdate = false;
};
// Send initial data
sendUpdate();
// Set up throttled change notifications
var changeToken = SetupDataChangeNotifications(collection, () =>
{
if (!pendingUpdate)
{
pendingUpdate = true;
throttleTimer.Elapsed += (s, e) =>
{
sendUpdate();
throttleTimer.Stop();
};
throttleTimer.Start();
}
});
observable.ClientDisconnected = () =>
{
changeToken?.Dispose();
throttleTimer?.Dispose();
};
return observable;
}
}
Error Handling in Observables
Implement proper error handling for robust observables:
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<IEnumerable<DebitAccount>> GetRobustObservable(
IMongoCollection<DebitAccount> collection,
ILogger<DebitAccount> logger)
{
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
try
{
var sendUpdate = () =>
{
try
{
var accounts = GetAllAccounts(collection);
observable.OnNext(accounts);
}
catch (Exception ex)
{
logger.LogError(ex, "Error updating observable accounts");
observable.OnError(ex);
}
};
// Send initial data
sendUpdate();
// Set up change notifications with error handling
var changeToken = SetupDataChangeNotifications(collection, () =>
{
try
{
sendUpdate();
}
catch (Exception ex)
{
logger.LogError(ex, "Error in observable change notification");
observable.OnError(ex);
}
});
observable.ClientDisconnected = () => changeToken?.Dispose();
}
catch (Exception ex)
{
logger.LogError(ex, "Error initializing observable");
observable.OnError(ex);
}
return observable;
}
}
Authentication and Authorization
Observable queries support the same authentication and authorization as regular queries:
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
[Authorize]
public static ISubject<IEnumerable<DebitAccount>> GetSecureObservable(IMongoCollection<DebitAccount> collection)
{
// Only authenticated users can subscribe
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
// Implementation...
return observable;
}
[Authorize(Roles = "Admin")]
public static ISubject<IEnumerable<DebitAccount>> GetAdminObservable(IMongoCollection<DebitAccount> collection)
{
// Only admin users can subscribe
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
// Implementation...
return observable;
}
}
Dependency Injection
Model-bound observable queries support the same dependency injection patterns as regular model-bound queries:
[ReadModel]
public record DebitAccount(AccountId Id, AccountName Name, CustomerId Owner, decimal Balance)
{
public static ISubject<IEnumerable<DebitAccount>> GetAccountsWithBusinessLogic(
IMongoCollection<DebitAccount> collection,
IAccountValidator validator,
ILogger<DebitAccount> logger)
{
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
var sendValidAccounts = () =>
{
try
{
var accounts = GetAllAccounts(collection);
var validAccounts = accounts.Where(validator.IsValid);
observable.OnNext(validAccounts);
}
catch (Exception ex)
{
logger.LogError(ex, "Error getting valid accounts");
observable.OnError(ex);
}
};
sendValidAccounts();
var changeToken = SetupDataChangeNotifications(collection, sendValidAccounts);
observable.ClientDisconnected = () => changeToken?.Dispose();
return observable;
}
}
Best Practices for Model-Bound Observable Queries
- Always handle client disconnection with the
ClientDisconnectedcallback when usingClientObservable<T>directly - Send initial data immediately before setting up change monitoring
- Use appropriate filters to minimize unnecessary data transmission
- Consider throttling for high-frequency changes to prevent overwhelming clients
- Implement error handling to gracefully handle data source connection issues
- Clean up resources properly when clients disconnect
- Use authentication to control who can subscribe to observable endpoints
- Monitor performance and consider the impact of many concurrent subscriptions
- Keep observable methods static and follow the same patterns as regular model-bound queries
Frontend Integration
Observable queries integrate seamlessly with frontend frameworks through the proxy generator and the ObservableQuery construct:
accountsObservable.subscribe(accounts => {
// Handle real-time account updates
updateUI(accounts);
});
The ISubject<T> return type automatically establishes and manages WebSocket connections, providing:
Important: When using
ClientObservable<T>directly, theClientDisconnectedcallback is essential for cleaning up resources to prevent memory leaks.
- Automatic connection management - WebSocket connections are established and maintained automatically
- Strongly-typed data flow - Full TypeScript support through the proxy generator
- Reactive integration - Seamless integration with React hooks like
useObservableQuery() - Reconnection handling - Automatic reconnection and state recovery on connection loss
Note: The proxy generator automatically creates TypeScript types for your observable queries, making them strongly typed on the frontend as well.
Waiting for the First HTTP Result
The generated HTTP GET endpoint for an observable query returns the current snapshot as JSON. If the observable does not have a current value yet, add waitForFirstResult=true to keep the HTTP request open until the first item is produced.
Arc applies a timeout while waiting. By default the timeout is 30 seconds. You can override it with waitForFirstResultTimeout, expressed in seconds.
curl "https://localhost:5001/api/debit-account/observe-single?waitForFirstResult=true"
curl "https://localhost:5001/api/debit-account/observe-single?waitForFirstResult=true&waitForFirstResultTimeout=10"
This is useful when debugging observable queries with cURL or any other plain HTTP client and you want the request to block until the first result is available.
See Use Observable Queries with cURL for snapshot, SSE, and long-polling workflows.
Connection Management
Arc automatically handles WebSocket connections for observable queries:
- Connection establishment - Automatic WebSocket upgrade for observable endpoints
- Message serialization - Automatic JSON serialization of observable data
- Connection cleanup - Proper disposal of resources when clients disconnect
- Reconnection handling - Clients can reconnect and resume subscriptions
The same connection management applies whether using controller-based or model-bound approaches.