Observable Query Demultiplexer
The Observable Query Demultiplexer is a composite real-time streaming endpoint that multiplexes multiple observable query subscriptions over a single persistent connection.
Rather than creating one WebSocket or SSE connection per query, the demultiplexer provides two fixed, well-known endpoints — one for WebSocket and one for Server-Sent Events — that all observable queries in an application route through.
Endpoints
| Transport | Endpoint | Direction |
|---|---|---|
| WebSocket | /.cratis/queries/ws |
Bidirectional — subscribe, unsubscribe, ping/pong |
| Server-Sent Events (SSE) | /.cratis/queries/sse |
Server → client; one query per connection |
Both endpoints are registered automatically by UseCratisArc() and require no manual configuration.
Why a Composite Demultiplexer?
Individual per-query WebSocket endpoints work but come with drawbacks at scale:
- Each browser tab opens a separate WebSocket per observable query.
- HTTP/1.1 limits the number of concurrent connections per origin.
- SSE has the same constraint and typically falls back to polling when the connection limit is reached.
The demultiplexer solves this by allowing a single WebSocket to carry updates for many queries simultaneously, and by providing a single, predictable SSE endpoint that clients can connect to for any query.
Protocol
All messages exchanged over the demultiplexer share a common envelope:
{
"type": "<ObservableQueryHubMessageType>",
"queryId": "<client-assigned-id>",
"payload": { ... },
"timestamp": 1234567890
}
| Field | Description |
|---|---|
type |
One of the message types listed below. |
queryId |
Client-assigned identifier that correlates subscriptions with their result updates. Must be unique per subscription within a connection. |
payload |
Depends on type — see the table below. |
timestamp |
Unix milliseconds. Only populated for ping / pong. |
Message Types
| Type | Direction | Payload |
|---|---|---|
subscribe (0) |
Client → Server | ObservableQuerySubscriptionRequest |
unsubscribe (1) |
Client → Server | (none) |
queryResult (2) |
Server → Client | QueryResult |
unauthorized (3) |
Server → Client | (none) |
error (4) |
Server → Client | Error message string |
ping (5) |
Client → Server | (timestamp only) |
pong (6) |
Server → Client | (timestamp echoed from ping) |
Subscribe Payload — ObservableQuerySubscriptionRequest
{
"queryName": "MyApp.Authors.Listing.AllAuthors",
"arguments": { "filter": "active" },
"page": 0,
"pageSize": 25,
"sortBy": "name",
"sortDirection": "asc"
}
| Field | Required | Description |
|---|---|---|
queryName |
✅ | Fully qualified name of the observable query method (e.g. MyApp.Features.Authors.Listing.AllAuthors). |
arguments |
☐ | Query-string arguments forwarded to the query performer. |
page |
☐ | Zero-based page index for paged queries. |
pageSize |
☐ | Number of items per page. |
sortBy |
☐ | Field name to sort by (case-insensitive). |
sortDirection |
☐ | asc or desc. |
WebSocket Transport
Connect to /.cratis/queries/ws and send subscribe messages to start receiving updates.
Subscribe
{
"type": 0,
"queryId": "authors-list",
"payload": {
"queryName": "MyApp.Authors.Listing.AllAuthors"
}
}
The server responds with one or more queryResult messages whenever the underlying data changes:
{
"type": 2,
"queryId": "authors-list",
"payload": {
"isSuccess": true,
"isAuthorized": true,
"data": [ ... ],
"validationResults": []
}
}
Unsubscribe
{
"type": 1,
"queryId": "authors-list"
}
Keep-alive (Ping / Pong)
Send a ping with the current Unix timestamp; the server echoes it as a pong with the same timestamp for round-trip latency measurement.
{ "type": 5, "timestamp": 1740000000000 }
// Server responds:
{ "type": 6, "timestamp": 1740000000000 }
Unauthorized
If the current user is not authorized to access the requested query, the server sends an unauthorized message and no data stream is established:
{
"type": 3,
"queryId": "authors-list"
}
SSE Transport
Connect to /.cratis/queries/sse using the EventSource API. Pass the fully qualified query name in the query query-string parameter. All other query-string parameters are forwarded as query arguments.
GET /.cratis/queries/sse?query=MyApp.Authors.Listing.AllAuthors&filter=active
The server responds with the standard SSE content type (text/event-stream) and streams data: frames containing serialized ObservableQueryHubMessage envelopes whenever the underlying data changes.
Each SSE connection carries a single query subscription. To observe multiple queries simultaneously via SSE, open multiple EventSource connections — or use the WebSocket transport which multiplexes all subscriptions over one connection.
Paging and Sorting via SSE
Pass paging and sorting directly as query-string parameters:
GET /.cratis/queries/sse?query=MyApp.Authors.Listing.AllAuthors&page=0&pageSize=20&sortBy=name&sortDirection=asc
Authorization
Authorization is enforced for every subscription through the standard query pipeline, including all registered IQueryFilter implementations.
- If the query performer has an
[Authorize]attribute and the current user is not authenticated or lacks the required role, the subscription is rejected with anunauthorizedmessage. - If the query allows anonymous access (
[AllowAnonymous]), the subscription is accepted regardless of authentication state. - Authorization is re-evaluated on every new subscription, not cached for the lifetime of the connection.
Keep-alive
Both WebSocket and SSE transports send automatic keep-alive messages to prevent idle connections from being closed by proxies or firewalls.
The server sends a ping message only when no other message has been sent within the configured interval. If data is flowing normally (frequent queryResult messages), the keep-alive is suppressed — it fires only during periods of inactivity.
Configuration
Configure the keep-alive interval in ArcOptions.Query:
builder.Services.Configure<ArcOptions>(options =>
{
options.Query.KeepAliveInterval = TimeSpan.FromSeconds(30); // default
});
Or inline when calling AddCratisArc():
builder.AddCratisArc(options =>
{
options.Query.KeepAliveInterval = TimeSpan.FromSeconds(45);
});
Set KeepAliveInterval to TimeSpan.Zero or a negative value to disable keep-alive entirely.
| Property | Type | Default | Description |
|---|---|---|---|
Query.KeepAliveInterval |
TimeSpan |
30 seconds | How often to send a keep-alive ping when no data is flowing. |
See also
- Observable Queries (model-bound) — How to expose observable queries in the backend.
- Query Pipeline — How the query pipeline works, including filter hooks.
- Authorization — Role-based authorization for queries and commands.
- Frontend: Observable Query Multiplexing — How to configure the frontend to use the hub.