Observable Query Demultiplexer
The Observable Query Demultiplexer is a composite real-time streaming endpoint that multiplexes multiple observable query subscriptions over a single persistent connection.
Rather than creating one WebSocket or SSE connection per query, the demultiplexer provides two fixed, well-known endpoints — one for WebSocket and one for Server-Sent Events — that all observable queries in an application route through.
Endpoints
Section titled “Endpoints”| Transport | Endpoint | Direction |
|---|---|---|
| WebSocket | /.cratis/queries/ws | Bidirectional — subscribe, unsubscribe, ping/pong |
| Server-Sent Events (SSE) | /.cratis/queries/sse | Server → client; one query per connection |
Both endpoints are registered automatically by UseCratisArc() and require no manual configuration.
Why a Composite Demultiplexer?
Section titled “Why a Composite Demultiplexer?”Individual per-query WebSocket endpoints work but come with drawbacks at scale:
- Each browser tab opens a separate WebSocket per observable query.
- HTTP/1.1 limits the number of concurrent connections per origin.
- SSE has the same constraint and typically falls back to polling when the connection limit is reached.
The demultiplexer solves this by allowing a single WebSocket to carry updates for many queries simultaneously, and by providing a single, predictable SSE endpoint that clients can connect to for any query.
Protocol
Section titled “Protocol”All messages exchanged over the demultiplexer share a common envelope:
{ "type": "<ObservableQueryHubMessageType>", "queryId": "<client-assigned-id>", "payload": { ... }, "timestamp": 1234567890}| Field | Description |
|---|---|
type | One of the message types listed below. |
queryId | Client-assigned identifier that correlates subscriptions with their result updates. Must be unique per subscription within a connection. |
payload | Depends on type — see the table below. |
timestamp | Unix milliseconds. Only populated for ping / pong. |
Message Types
Section titled “Message Types”| Type | Direction | Payload |
|---|---|---|
subscribe (0) | Client → Server | ObservableQuerySubscriptionRequest |
unsubscribe (1) | Client → Server | (none) |
queryResult (2) | Server → Client | QueryResult |
unauthorized (3) | Server → Client | (none) |
error (4) | Server → Client | Error message string |
ping (5) | Client → Server | (timestamp only) |
pong (6) | Server → Client | (timestamp echoed from ping) |
Subscribe Payload — ObservableQuerySubscriptionRequest
Section titled “Subscribe Payload — ObservableQuerySubscriptionRequest”{ "queryName": "MyApp.Authors.Listing.AllAuthors", "arguments": { "filter": "active" }, "page": 0, "pageSize": 25, "sortBy": "name", "sortDirection": "asc"}| Field | Required | Description |
|---|---|---|
queryName | ✅ | Fully qualified name of the observable query method (e.g. MyApp.Features.Authors.Listing.AllAuthors). |
arguments | ☐ | Query-string arguments forwarded to the query performer. |
page | ☐ | Zero-based page index for paged queries. |
pageSize | ☐ | Number of items per page. |
sortBy | ☐ | Field name to sort by (case-insensitive). |
sortDirection | ☐ | asc or desc. |
WebSocket Transport
Section titled “WebSocket Transport”Connect to /.cratis/queries/ws and send subscribe messages to start receiving updates.
Subscribe
Section titled “Subscribe”{ "type": 0, "queryId": "authors-list", "payload": { "queryName": "MyApp.Authors.Listing.AllAuthors" }}The server responds with one or more queryResult messages whenever the underlying data changes:
{ "type": 2, "queryId": "authors-list", "payload": { "isSuccess": true, "isAuthorized": true, "data": [ ... ], "validationResults": [] }}Unsubscribe
Section titled “Unsubscribe”{ "type": 1, "queryId": "authors-list"}Keep-alive (Ping / Pong)
Section titled “Keep-alive (Ping / Pong)”Send a ping with the current Unix timestamp; the server echoes it as a pong with the same timestamp for round-trip latency measurement.
{ "type": 5, "timestamp": 1740000000000 }// Server responds:{ "type": 6, "timestamp": 1740000000000 }Unauthorized
Section titled “Unauthorized”If the current user is not authorized to access the requested query, the server sends an unauthorized message and no data stream is established:
{ "type": 3, "queryId": "authors-list"}SSE Transport
Section titled “SSE Transport”Connect to /.cratis/queries/sse using the EventSource API. Pass the fully qualified query name in the query query-string parameter. All other query-string parameters are forwarded as query arguments.
GET /.cratis/queries/sse?query=MyApp.Authors.Listing.AllAuthors&filter=activeThe server responds with the standard SSE content type (text/event-stream) and streams data: frames containing serialized ObservableQueryHubMessage envelopes whenever the underlying data changes.
Each SSE connection carries a single query subscription. To observe multiple queries simultaneously via SSE, open multiple EventSource connections — or use the WebSocket transport which multiplexes all subscriptions over one connection.
Paging and Sorting via SSE
Section titled “Paging and Sorting via SSE”Pass paging and sorting directly as query-string parameters:
GET /.cratis/queries/sse?query=MyApp.Authors.Listing.AllAuthors&page=0&pageSize=20&sortBy=name&sortDirection=ascAuthorization
Section titled “Authorization”Authorization is enforced for every subscription through the standard query pipeline, including all registered IQueryFilter implementations.
- If the query performer has an
[Authorize]attribute and the current user is not authenticated or lacks the required role, the subscription is rejected with anunauthorizedmessage. - If the query allows anonymous access (
[AllowAnonymous]), the subscription is accepted regardless of authentication state. - Authorization is re-evaluated on every new subscription, not cached for the lifetime of the connection.
Keep-alive
Section titled “Keep-alive”Both WebSocket and SSE transports send automatic keep-alive messages to prevent idle connections from being closed by proxies or firewalls.
The server sends a ping message only when no other message has been sent within the configured interval. If data is flowing normally (frequent queryResult messages), the keep-alive is suppressed — it fires only during periods of inactivity.
Configuration
Section titled “Configuration”Configure the keep-alive interval in ArcOptions.Query:
builder.Services.Configure<ArcOptions>(options =>{ options.Query.KeepAliveInterval = TimeSpan.FromSeconds(30); // default});Or inline when calling AddCratisArc():
builder.AddCratisArc(options =>{ options.Query.KeepAliveInterval = TimeSpan.FromSeconds(45);});Set KeepAliveInterval to TimeSpan.Zero or a negative value to disable keep-alive entirely.
| Property | Type | Default | Description |
|---|---|---|---|
Query.KeepAliveInterval | TimeSpan | 30 seconds | How often to send a keep-alive ping when no data is flowing. |
See also
Section titled “See also”- Observable Queries (model-bound) — How to expose observable queries in the backend.
- Query Pipeline — How the query pipeline works, including filter hooks.
- Authorization — Role-based authorization for queries and commands.
- Frontend: Observable Query Multiplexing — How to configure the frontend to use the hub.