---
title: Observable Query Demultiplexer
---

The Observable Query Demultiplexer is a composite real-time streaming endpoint that multiplexes multiple observable query subscriptions over a single persistent connection.

Rather than creating one WebSocket or SSE connection per query, the demultiplexer provides **two fixed, well-known endpoints** — one for WebSocket and one for Server-Sent Events — that all observable queries in an application route through.

## Endpoints

| Transport | Endpoint | Direction |
|-----------|----------|-----------|
| WebSocket | `/.cratis/queries/ws` | Bidirectional — subscribe, unsubscribe, ping/pong |
| Server-Sent Events (SSE) | `/.cratis/queries/sse` | Server → client; one query per connection |

Both endpoints are registered automatically by `UseCratisArc()` and require no manual configuration.

## Why a Composite Demultiplexer?

Individual per-query WebSocket endpoints work but come with drawbacks at scale:

- Each browser tab opens a separate WebSocket per observable query.
- HTTP/1.1 limits the number of concurrent connections per origin.
- SSE has the same constraint and typically falls back to polling when the connection limit is reached.

The demultiplexer solves this by allowing a single WebSocket to carry updates for many queries simultaneously, and by providing a single, predictable SSE endpoint that clients can connect to for any query.

## Protocol

All messages exchanged over the demultiplexer share a common envelope:

```json
{
  "type": "<ObservableQueryHubMessageType>",
  "queryId": "<client-assigned-id>",
  "payload": { ... },
  "timestamp": 1234567890
}
```

| Field | Description |
|-------|-------------|
| `type` | One of the message types listed below. |
| `queryId` | Client-assigned identifier that correlates subscriptions with their result updates. Must be unique per subscription within a connection. |
| `payload` | Depends on `type` — see the table below. |
| `timestamp` | Unix milliseconds. Only populated for `ping` / `pong`. |

### Message Types

| Type | Direction | Payload |
|------|-----------|---------|
| `subscribe` (0) | Client → Server | `ObservableQuerySubscriptionRequest` |
| `unsubscribe` (1) | Client → Server | *(none)* |
| `queryResult` (2) | Server → Client | `QueryResult` |
| `unauthorized` (3) | Server → Client | *(none)* |
| `error` (4) | Server → Client | Error message string |
| `ping` (5) | Client → Server | *(timestamp only)* |
| `pong` (6) | Server → Client | *(timestamp echoed from ping)* |

### Subscribe Payload — `ObservableQuerySubscriptionRequest`

```json
{
  "queryName": "MyApp.Authors.Listing.AllAuthors",
  "arguments": { "filter": "active" },
  "page": 0,
  "pageSize": 25,
  "sortBy": "name",
  "sortDirection": "asc"
}
```

| Field | Required | Description |
|-------|----------|-------------|
| `queryName` | ✅ | Fully qualified name of the observable query method (e.g. `MyApp.Features.Authors.Listing.AllAuthors`). |
| `arguments` | ☐ | Query-string arguments forwarded to the query performer. |
| `page` | ☐ | Zero-based page index for paged queries. |
| `pageSize` | ☐ | Number of items per page. |
| `sortBy` | ☐ | Field name to sort by (case-insensitive). |
| `sortDirection` | ☐ | `asc` or `desc`. |

## WebSocket Transport

Connect to `/.cratis/queries/ws` and send `subscribe` messages to start receiving updates.

### Subscribe

```json
{
  "type": 0,
  "queryId": "authors-list",
  "payload": {
    "queryName": "MyApp.Authors.Listing.AllAuthors"
  }
}
```

The server responds with one or more `queryResult` messages whenever the underlying data changes:

```json
{
  "type": 2,
  "queryId": "authors-list",
  "payload": {
    "isSuccess": true,
    "isAuthorized": true,
    "data": [ ... ],
    "validationResults": []
  }
}
```

### Unsubscribe

```json
{
  "type": 1,
  "queryId": "authors-list"
}
```

### Keep-alive (Ping / Pong)

Send a `ping` with the current Unix timestamp; the server echoes it as a `pong` with the same timestamp for round-trip latency measurement.

```json
{ "type": 5, "timestamp": 1740000000000 }
// Server responds:
{ "type": 6, "timestamp": 1740000000000 }
```

### Unauthorized

If the current user is not authorized to access the requested query, the server sends an `unauthorized` message and no data stream is established:

```json
{
  "type": 3,
  "queryId": "authors-list"
}
```

## SSE Transport

Connect to `/.cratis/queries/sse` using the `EventSource` API. Pass the fully qualified query name in the `query` query-string parameter. All other query-string parameters are forwarded as query arguments.

```http
GET /.cratis/queries/sse?query=MyApp.Authors.Listing.AllAuthors&filter=active
```

The server responds with the standard SSE content type (`text/event-stream`) and streams `data:` frames containing serialized `ObservableQueryHubMessage` envelopes whenever the underlying data changes.

Each SSE connection carries a **single query subscription**. To observe multiple queries simultaneously via SSE, open multiple `EventSource` connections — or use the WebSocket transport which multiplexes all subscriptions over one connection.

### Paging and Sorting via SSE

Pass paging and sorting directly as query-string parameters:

```http
GET /.cratis/queries/sse?query=MyApp.Authors.Listing.AllAuthors&page=0&pageSize=20&sortBy=name&sortDirection=asc
```

## Authorization

Authorization is enforced for every subscription through the standard query pipeline, including all registered `IQueryFilter` implementations.

- If the query performer has an `[Authorize]` attribute and the current user is not authenticated or lacks the required role, the subscription is rejected with an `unauthorized` message.
- If the query allows anonymous access (`[AllowAnonymous]`), the subscription is accepted regardless of authentication state.
- Authorization is re-evaluated on every new subscription, not cached for the lifetime of the connection.

## Keep-alive

Both WebSocket and SSE transports send automatic keep-alive messages to prevent idle connections from being closed by proxies or firewalls.

The server sends a `ping` message only when no other message has been sent within the configured interval. If data is flowing normally (frequent `queryResult` messages), the keep-alive is suppressed — it fires only during periods of inactivity.

### Configuration

Configure the keep-alive interval in `ArcOptions.Query`:

```csharp
builder.Services.Configure<ArcOptions>(options =>
{
    options.Query.KeepAliveInterval = TimeSpan.FromSeconds(30); // default
});
```

Or inline when calling `AddCratisArc()`:

```csharp
builder.AddCratisArc(options =>
{
    options.Query.KeepAliveInterval = TimeSpan.FromSeconds(45);
});
```

Set `KeepAliveInterval` to `TimeSpan.Zero` or a negative value to disable keep-alive entirely.

| Property | Type | Default | Description |
|----------|------|---------|-------------|
| `Query.KeepAliveInterval` | `TimeSpan` | 30 seconds | How often to send a keep-alive ping when no data is flowing. |

## See also

- [Observable Queries (model-bound)](/arc/backend/queries/model-bound/observable-queries/) — How to expose observable queries in the backend.
- [Query Pipeline](/arc/backend/queries/query-pipeline/) — How the query pipeline works, including filter hooks.
- [Authorization](/arc/backend/core/authorization/) — Role-based authorization for queries and commands.
- [Frontend: Observable Query Multiplexing](/arc/frontend/react/queries/observable-query-multiplexing/) — How to configure the frontend to use the hub.
