Observable Queries
Observable queries provide real-time data streaming using WebSockets, enabling reactive user experiences where data changes are pushed to clients as they occur. You achieve this by returning ISubject<T> from your controller actions.
The ISubject<T> return type automatically establishes a WebSocket connection between the server and client, enabling real-time data updates. This integrates seamlessly with the ObservableQuery construct in the frontend through the proxy generator, creating strongly-typed reactive data flows.
Basic Observable Query
The key to an observable query is to leverage the ClientObservable<T> generic type.
[HttpGet("observable")]
public ISubject<IEnumerable<DebitAccount>> AllAccountsObservable()
{
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
// Send initial data
var accounts = GetCurrentAccounts();
observable.OnNext(accounts);
// Set up real-time updates (implementation varies by data source)
SetupDataChangeNotifications(observable);
return observable;
}
Observable with Arguments
Observable queries can accept arguments just like regular queries:
[HttpGet("owner/{ownerId}/observable")]
public ISubject<IEnumerable<DebitAccount>> GetAccountsByOwnerObservable(CustomerId ownerId)
{
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
// Send initial filtered data
var accounts = GetAccountsByOwner(ownerId);
observable.OnNext(accounts);
// Set up filtered change notifications
SetupFilteredDataChangeNotifications(observable, ownerId);
return observable;
}
[HttpGet("filtered-observable")]
public ISubject<IEnumerable<DebitAccount>> GetFilteredAccountsObservable(
[FromQuery] decimal? minBalance = null)
{
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
// Send initial filtered data
var accounts = minBalance.HasValue
? GetAccountsAboveBalance(minBalance.Value)
: GetAllAccounts();
observable.OnNext(accounts);
// Set up change notifications with filter
SetupFilteredDataChangeNotifications(observable, minBalance);
return observable;
}
Single Object Observable
For observing changes to a single object:
[HttpGet("{id}/observable")]
public ISubject<DebitAccount> GetAccountObservable(AccountId id)
{
var observable = new ClientObservable<DebitAccount>();
// Send initial object
var account = GetAccountById(id);
if (account is not null)
{
observable.OnNext(account);
}
// Set up change notifications for this specific object
SetupSingleObjectChangeNotifications(observable, id);
return observable;
}
Custom Observable Logic
For more complex scenarios, you can implement custom observable logic using ClientObservable<T>:
[HttpGet("summary")]
public ISubject<AccountSummary> GetAccountSummaryObservable()
{
var observable = new ClientObservable<AccountSummary>();
var calculateSummary = () =>
{
var accounts = GetAllAccounts();
return new AccountSummary(accounts.Count(), accounts.Sum(a => a.Balance));
};
// Send initial summary
observable.OnNext(calculateSummary());
// Set up change notifications for computed data
// Implementation depends on your data source and change notification mechanism
var changeToken = SetupDataChangeNotifications(() =>
{
observable.OnNext(calculateSummary());
});
// Clean up when client disconnects
observable.ClientDisconnected = () => changeToken?.Dispose();
return observable;
}
Multiple Data Source Observables
Observe changes across multiple data sources:
public record CombinedData(IEnumerable<DebitAccount> Accounts, IEnumerable<Customer> Customers);
[HttpGet("combined-observable")]
public ISubject<CombinedData> GetCombinedDataObservable()
{
var observable = new ClientObservable<CombinedData>();
var sendUpdate = () =>
{
var accounts = GetAllAccounts();
var customers = GetAllCustomers();
observable.OnNext(new CombinedData(accounts, customers));
};
// Send initial data
sendUpdate();
// Set up change notifications for multiple data sources
var accountChangeToken = SetupAccountChangeNotifications(sendUpdate);
var customerChangeToken = SetupCustomerChangeNotifications(sendUpdate);
observable.ClientDisconnected = () =>
{
accountChangeToken?.Dispose();
customerChangeToken?.Dispose();
};
return observable;
}
Observable with Computed Values
Create observables that compute derived values:
[HttpGet("computed-metrics")]
public ISubject<AccountMetrics> GetAccountMetricsObservable()
{
var observable = new ClientObservable<AccountMetrics>();
var computeMetrics = () =>
{
var accounts = GetAllAccounts();
return new AccountMetrics(
TotalAccounts: accounts.Count(),
TotalBalance: accounts.Sum(a => a.Balance),
AverageBalance: accounts.Any() ? accounts.Average(a => a.Balance) : 0,
ActiveAccounts: accounts.Count(a => a.Balance > 0),
HighValueAccounts: accounts.Count(a => a.Balance > 100000)
);
};
// Send initial metrics
observable.OnNext(computeMetrics());
// Set up change notifications and recompute when data changes
var changeToken = SetupDataChangeNotifications(() =>
{
observable.OnNext(computeMetrics());
});
observable.ClientDisconnected = () => changeToken?.Dispose();
return observable;
}
Filtered Observables with Dynamic Criteria
Allow clients to specify filter criteria for observables:
public record ObservableFilter(
decimal? MinBalance,
decimal? MaxBalance,
string? NamePattern,
CustomerId? OwnerId);
[HttpPost("filtered-observable")]
public ISubject<IEnumerable<DebitAccount>> GetFilteredObservable([FromBody] ObservableFilter filter)
{
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
// Build filter predicate
var predicate = BuildFilterPredicate(filter);
var sendFilteredData = () =>
{
var accounts = GetAllAccounts().Where(predicate);
observable.OnNext(accounts);
};
// Send initial filtered data
sendFilteredData();
// Set up change notifications with the same filter
var changeToken = SetupFilteredDataChangeNotifications(sendFilteredData, filter);
observable.ClientDisconnected = () => changeToken?.Dispose();
return observable;
}
private Func<DebitAccount, bool> BuildFilterPredicate(ObservableFilter filter)
{
return account =>
(!filter.MinBalance.HasValue || account.Balance >= filter.MinBalance.Value) &&
(!filter.MaxBalance.HasValue || account.Balance <= filter.MaxBalance.Value) &&
(string.IsNullOrEmpty(filter.NamePattern) || account.Name.Contains(filter.NamePattern, StringComparison.OrdinalIgnoreCase)) &&
(!filter.OwnerId.HasValue || account.Owner == filter.OwnerId.Value);
}
Throttled Observables
For high-frequency changes, implement throttling to prevent overwhelming clients:
[HttpGet("throttled-observable")]
public ISubject<IEnumerable<DebitAccount>> GetThrottledObservable(
[FromQuery] int throttleMs = 1000)
{
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
var throttleTimer = new System.Timers.Timer(throttleMs);
var pendingUpdate = false;
var sendUpdate = () =>
{
var accounts = GetAllAccounts();
observable.OnNext(accounts);
pendingUpdate = false;
};
// Send initial data
sendUpdate();
// Set up throttled change notifications
var changeToken = SetupDataChangeNotifications(() =>
{
if (!pendingUpdate)
{
pendingUpdate = true;
throttleTimer.Elapsed += (s, e) =>
{
sendUpdate();
throttleTimer.Stop();
};
throttleTimer.Start();
}
});
observable.ClientDisconnected = () =>
{
changeToken?.Dispose();
throttleTimer?.Dispose();
};
return observable;
}
Error Handling in Observables
Implement proper error handling for robust observables:
[HttpGet("robust-observable")]
public ISubject<IEnumerable<DebitAccount>> GetRobustObservable()
{
var observable = new ClientObservable<IEnumerable<DebitAccount>>();
try
{
var sendUpdate = () =>
{
try
{
var accounts = GetAllAccounts();
observable.OnNext(accounts);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error updating observable accounts");
observable.OnError(ex);
}
};
// Send initial data
sendUpdate();
// Set up change notifications with error handling
var changeToken = SetupDataChangeNotifications(() =>
{
try
{
sendUpdate();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in observable change notification");
observable.OnError(ex);
}
});
observable.ClientDisconnected = () => changeToken?.Dispose();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error initializing observable");
observable.OnError(ex);
}
return observable;
}
Authentication and Authorization
Observable queries support the same authentication and authorization as regular queries:
[Authorize]
[HttpGet("secure-observable")]
public ISubject<IEnumerable<DebitAccount>> GetSecureObservable()
{
// Only authenticated users can subscribe
return _collection.Observe();
}
[Authorize(Roles = "Admin")]
[HttpGet("admin-observable")]
public ISubject<IEnumerable<DebitAccount>> GetAdminObservable()
{
// Only admin users can subscribe
return _collection.Observe();
}
Best Practices for Observable Queries
- Always handle client disconnection with the
ClientDisconnectedcallback when usingClientObservable<T>directly - Send initial data immediately before setting up change monitoring
- Use appropriate filters to minimize unnecessary data transmission
- Consider throttling for high-frequency changes to prevent overwhelming clients
- Implement error handling to gracefully handle data source connection issues
- Clean up resources properly when clients disconnect
- Use authentication to control who can subscribe to observable endpoints
- Monitor performance and consider the impact of many concurrent subscriptions
Connection Management
Arc automatically handles WebSocket connections for observable queries:
- Connection establishment - Automatic WebSocket upgrade for observable endpoints
- Message serialization - Automatic JSON serialization of observable data
- Connection cleanup - Proper disposal of resources when clients disconnect
- Reconnection handling - Clients can reconnect and resume subscriptions
Frontend Integration
Observable queries integrate seamlessly with frontend frameworks through the proxy generator and the ObservableQuery construct:
// Generated TypeScript proxy automatically handles WebSocket connections
const accountsObservable = await accountsProxy.getAllAccountsObservable();
accountsObservable.subscribe(accounts => {
// Handle real-time account updates
updateUI(accounts);
});
The ISubject<T> return type automatically establishes and manages WebSocket connections, providing:
- Automatic connection management - WebSocket connections are established and maintained automatically
- Strongly-typed data flow - Full TypeScript support through the proxy generator
- Reactive integration - Seamless integration with React hooks like
useObservableQuery() - Reconnection handling - Automatic reconnection and state recovery on connection loss
Important: When using
ClientObservable<T>directly, theClientDisconnectedcallback is essential for cleaning up resources to prevent memory leaks.
Note: The proxy generator automatically creates TypeScript types for your observable queries, making them strongly typed on the frontend as well.