Skip to content

Observable Query Demultiplexer

The Observable Query Demultiplexer is a composite real-time streaming endpoint that multiplexes multiple observable query subscriptions over a single persistent connection.

Rather than creating one WebSocket or SSE connection per query, the demultiplexer provides two fixed, well-known endpoints — one for WebSocket and one for Server-Sent Events — that all observable queries in an application route through.

TransportEndpointDirection
WebSocket/.cratis/queries/wsBidirectional — subscribe, unsubscribe, ping/pong
Server-Sent Events (SSE)/.cratis/queries/sseServer → client; one query per connection

Both endpoints are registered automatically by UseCratisArc() and require no manual configuration.

Individual per-query WebSocket endpoints work but come with drawbacks at scale:

  • Each browser tab opens a separate WebSocket per observable query.
  • HTTP/1.1 limits the number of concurrent connections per origin.
  • SSE has the same constraint and typically falls back to polling when the connection limit is reached.

The demultiplexer solves this by allowing a single WebSocket to carry updates for many queries simultaneously, and by providing a single, predictable SSE endpoint that clients can connect to for any query.

All messages exchanged over the demultiplexer share a common envelope:

{
"type": "<ObservableQueryHubMessageType>",
"queryId": "<client-assigned-id>",
"payload": { ... },
"timestamp": 1234567890
}
FieldDescription
typeOne of the message types listed below.
queryIdClient-assigned identifier that correlates subscriptions with their result updates. Must be unique per subscription within a connection.
payloadDepends on type — see the table below.
timestampUnix milliseconds. Only populated for ping / pong.
TypeDirectionPayload
subscribe (0)Client → ServerObservableQuerySubscriptionRequest
unsubscribe (1)Client → Server(none)
queryResult (2)Server → ClientQueryResult
unauthorized (3)Server → Client(none)
error (4)Server → ClientError message string
ping (5)Client → Server(timestamp only)
pong (6)Server → Client(timestamp echoed from ping)

Subscribe Payload — ObservableQuerySubscriptionRequest

Section titled “Subscribe Payload — ObservableQuerySubscriptionRequest”
{
"queryName": "MyApp.Authors.Listing.AllAuthors",
"arguments": { "filter": "active" },
"page": 0,
"pageSize": 25,
"sortBy": "name",
"sortDirection": "asc"
}
FieldRequiredDescription
queryNameFully qualified name of the observable query method (e.g. MyApp.Features.Authors.Listing.AllAuthors).
argumentsQuery-string arguments forwarded to the query performer.
pageZero-based page index for paged queries.
pageSizeNumber of items per page.
sortByField name to sort by (case-insensitive).
sortDirectionasc or desc.

Connect to /.cratis/queries/ws and send subscribe messages to start receiving updates.

{
"type": 0,
"queryId": "authors-list",
"payload": {
"queryName": "MyApp.Authors.Listing.AllAuthors"
}
}

The server responds with one or more queryResult messages whenever the underlying data changes:

{
"type": 2,
"queryId": "authors-list",
"payload": {
"isSuccess": true,
"isAuthorized": true,
"data": [ ... ],
"validationResults": []
}
}
{
"type": 1,
"queryId": "authors-list"
}

Send a ping with the current Unix timestamp; the server echoes it as a pong with the same timestamp for round-trip latency measurement.

{ "type": 5, "timestamp": 1740000000000 }
// Server responds:
{ "type": 6, "timestamp": 1740000000000 }

If the current user is not authorized to access the requested query, the server sends an unauthorized message and no data stream is established:

{
"type": 3,
"queryId": "authors-list"
}

Connect to /.cratis/queries/sse using the EventSource API. Pass the fully qualified query name in the query query-string parameter. All other query-string parameters are forwarded as query arguments.

GET /.cratis/queries/sse?query=MyApp.Authors.Listing.AllAuthors&filter=active

The server responds with the standard SSE content type (text/event-stream) and streams data: frames containing serialized ObservableQueryHubMessage envelopes whenever the underlying data changes.

Each SSE connection carries a single query subscription. To observe multiple queries simultaneously via SSE, open multiple EventSource connections — or use the WebSocket transport which multiplexes all subscriptions over one connection.

Pass paging and sorting directly as query-string parameters:

GET /.cratis/queries/sse?query=MyApp.Authors.Listing.AllAuthors&page=0&pageSize=20&sortBy=name&sortDirection=asc

Authorization is enforced for every subscription through the standard query pipeline, including all registered IQueryFilter implementations.

  • If the query performer has an [Authorize] attribute and the current user is not authenticated or lacks the required role, the subscription is rejected with an unauthorized message.
  • If the query allows anonymous access ([AllowAnonymous]), the subscription is accepted regardless of authentication state.
  • Authorization is re-evaluated on every new subscription, not cached for the lifetime of the connection.

Both WebSocket and SSE transports send automatic keep-alive messages to prevent idle connections from being closed by proxies or firewalls.

The server sends a ping message only when no other message has been sent within the configured interval. If data is flowing normally (frequent queryResult messages), the keep-alive is suppressed — it fires only during periods of inactivity.

Configure the keep-alive interval in ArcOptions.Query:

builder.Services.Configure<ArcOptions>(options =>
{
options.Query.KeepAliveInterval = TimeSpan.FromSeconds(30); // default
});

Or inline when calling AddCratisArc():

builder.AddCratisArc(options =>
{
options.Query.KeepAliveInterval = TimeSpan.FromSeconds(45);
});

Set KeepAliveInterval to TimeSpan.Zero or a negative value to disable keep-alive entirely.

PropertyTypeDefaultDescription
Query.KeepAliveIntervalTimeSpan30 secondsHow often to send a keep-alive ping when no data is flowing.