Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/data/nav/aitransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ export default {
name: 'Token streaming limits',
link: '/docs/ai-transport/token-streaming/token-rate-limits',
},
{
name: 'Publish from your server',
link: '/docs/ai-transport/token-streaming/server-publishing',
},
],
},
{
Expand Down
266 changes: 266 additions & 0 deletions src/pages/docs/ai-transport/token-streaming/server-publishing.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
---
title: Publish from your server
meta_description: "Learn how to publish AI response tokens from your server over a Realtime WebSocket connection, covering ordering, channel limits, and concurrent streams."
---

When streaming AI responses with [message per response](/docs/ai-transport/token-streaming/message-per-response) or [message per token](/docs/ai-transport/token-streaming/message-per-token), your server should publish tokens to Ably channels using a Realtime client. Realtime clients maintain persistent WebSocket connections to the Ably service, which provide the low-latency, ordered delivery needed for token streaming.

## Realtime connections <a id="realtime"/>

Use a Realtime client for server-side publishing with `message.append` or `message.create`. A Realtime client maintains a WebSocket connection to the Ably service, which provides low-latency publishing and guarantees that messages published on the same connection are delivered to subscribers in the order they were published. For more information, see [Realtime and REST](/docs/basics#realtime-and-rest).

While `publish()` and `appendMessage()` are available on both [REST](/docs/api/rest-sdk) and Realtime clients, REST does not guarantee [message ordering](/docs/platform/architecture/message-ordering) at high publish rates. Use a Realtime client when publishing at the rates typical of LLM token streaming.

Create a Realtime client on your server:

<Code>
```javascript
const ably = new Ably.Realtime({
key: '{{API_KEY}}',
echoMessages: false
});
```
```python
ably = AblyRealtime(
key='{{API_KEY}}',
echo_messages=False
)
```
```java
ClientOptions options = new ClientOptions();
options.key = "{{API_KEY}}";
options.echoMessages = false;
AblyRealtime ably = new AblyRealtime(options);
```
</Code>

<Aside data-type="note">
Set [`echoMessages`](/docs/api/realtime-sdk/types#client-options) to `false` on server-side clients to prevent the server from receiving its own published messages. This avoids unnecessary message delivery and billing for [echoed messages](/docs/pub-sub/advanced#echo).
</Aside>

## Message ordering <a id="ordering"/>

Ably guarantees that messages published on a single connection are delivered to subscribers in the order they were published. This ordering guarantee is essential for token streaming, because tokens must arrive in sequence for the final message to be correct.

This guarantee is per-connection. If you use [multiple connections](#multiple-streams) to handle higher concurrency, route all operations for a given channel through the same connection. This ensures that all `publish()` and `appendMessage()` calls for a given response maintain their order.

For more detail on how Ably preserves message order across its globally distributed infrastructure, see [message ordering](/docs/platform/architecture/message-ordering).

## Transient publishing and channel limits <a id="transient"/>

In a typical AI application, your server publishes responses to many distinct channels, often one per user session. When your server only publishes to a channel without subscribing, the SDK uses a [transient publish](/docs/pub-sub/advanced#transient-publish). Transient publishes do not count toward the limit on [number of channels per connection](/docs/platform/pricing/limits#connection).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
In a typical AI application, your server publishes responses to many distinct channels, often one per user session. When your server only publishes to a channel without subscribing, the SDK uses a [transient publish](/docs/pub-sub/advanced#transient-publish). Transient publishes do not count toward the limit on [number of channels per connection](/docs/platform/pricing/limits#connection).
In a typical AI application, your server publishes responses to many distinct channels, often one per user session. When your server publishes to a channel without attaching first, the SDK uses a [transient publish](/docs/pub-sub/advanced#transient-publish). Transient publishes do not count toward the limit on the [number of channels per connection](/docs/platform/pricing/limits#connection).
<Aside data-type="note">
The server must attach to the channel in order to subscribe to it. In this case, the SDK client instance will not use transient publishing.
</Aside>


All message actions use the same transient publish path, including `publish()` and `appendMessage()`. This means a single connection can publish to thousands of distinct channels without hitting the channel limit. No additional configuration is required. When you call `publish()` or `appendMessage()` on a channel that the client has not explicitly attached to, the SDK handles the transient attachment automatically.

The constraint to be aware of is the [per-connection inbound message rate](/docs/platform/pricing/limits#connection), not the number of channels.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this the case?


<Aside data-type="note">
If you also need to subscribe to channels on the same connection, those subscriptions require explicit attachment and will count toward the channel limit.
</Aside>

## Per-connection rate limits <a id="rate-limits"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that the content in this section belongs in the existing /docs/ai-transport/token-streaming/token-rate-limits, were you aware of that page?


Each connection has an [inbound message rate limit](/docs/platform/pricing/limits#connection) that caps how many messages per second can be published on that connection. How this limit interacts with your server's publish rate depends on which token streaming pattern you use.

### Message per response <a id="rate-limits-per-response"/>

With [message per response](/docs/ai-transport/token-streaming/message-per-response), your server calls `appendMessage()` at whatever rate the model produces tokens. Ably concatenates all appends received within a configurable time window, called the `appendRollupWindow`, into a single message before publishing it to subscribers. This is similar to [server-side batching](/docs/messages/batch#server-side), but happens at the edge of Ably's network where the connection is handled, before the message is published onwards. The connection's [inbound message rate limit](/docs/platform/pricing/limits#connection) applies to these rolled-up messages, not to individual `appendMessage()` calls.

For example, if your server publishes appends at 100 msg/s with a 40ms rollup window:

- 40ms window = 25 windows per second (1000ms / 40ms).
- Ably produces 25 rolled-up messages per second.
- Each rolled-up message contains approximately 4 tokens (100 / 25).
- The 25 msg/s rolled-up rate is what counts against the connection's inbound message rate limit.

The following table shows how the `appendRollupWindow` affects the delivery rate to subscribers, concurrent stream capacity, and the trade-off in delivery granularity:

| `appendRollupWindow` | Client delivery rate | Tokens per delivery (at 100 tok/s) | Concurrent streams per connection |
| --- | --- | --- | --- |
| 40ms *(default)* | 25 msg/s | ~4 | 2 |
| 100ms | 10 msg/s | ~10 | 5 |
| 200ms | 5 msg/s | ~20 | 10 |
| 500ms *(max)* | 2 msg/s | ~50 | 25 |

_Concurrent streams per connection is calculated based on the connection's [max inbound rate limit](/docs/platform/pricing/limits#connection)._

A longer rollup window allows more concurrent streams per connection, but subscribers receive tokens in larger, less frequent batches. The default 40ms window provides smooth delivery for up to two concurrent streams. Increasing the window to 100ms or 200ms accommodates more streams with a modest reduction in delivery granularity.

Set the rollup window when creating the client:

<Code>
```javascript
const ably = new Ably.Realtime({
key: '{{API_KEY}}',
echoMessages: false,
transportParams: { appendRollupWindow: 100 }
});
```
```python
ably = AblyRealtime(
key='{{API_KEY}}',
echo_messages=False,
transport_params={'appendRollupWindow': 100}
)
```
```java
ClientOptions options = new ClientOptions();
options.key = "{{API_KEY}}";
options.echoMessages = false;
options.transportParams = Map.of("appendRollupWindow", "100");
AblyRealtime ably = new AblyRealtime(options);
```
</Code>

For full detail on rollup configuration, see [token streaming limits](/docs/ai-transport/token-streaming/token-rate-limits#rollup).

### Message per token <a id="rate-limits-per-token"/>

With [message per token](/docs/ai-transport/token-streaming/message-per-token), each token is a separate `publish()` call with no rollup. Every publish counts directly against the [connection inbound message rate limit](/docs/platform/pricing/limits#connection). A model outputting 50 tokens per second saturates a single connection, leaving no capacity for additional streams.

Account for your model's peak output rate when determining how many concurrent streams a single connection can support. For strategies to stay within limits, see [token streaming limits](/docs/ai-transport/token-streaming/token-rate-limits#per-token).

## Multiple concurrent streams <a id="multiple-streams"/>

When your server handles more concurrent AI response streams than a single connection supports, create additional Realtime clients. Each client uses its own connection with its own message rate budget, so throughput scales linearly with the number of connections.

Route channels to connections using consistent hashing so that all operations for a given channel always go through the same connection. This preserves [message ordering](#ordering) for each response.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation looks like standard modulo hashing, not consistent hashing


<Code>
```javascript
class AblyConnectionPool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wonderin g whether we should call this AblyClientPool. I know there is one connection per client, but there is the concept of a connection inside the client (e.g. the connection state listener client.connection.on etc) so it feels a bit weird to call it the same thing a layer above the client

(We also call it getClient below)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we think an abstraction like this is useful, I wonder if it's worth adding to the SDK

constructor(size, options) {
this._connections = Array.from({ length: size }, (_, i) => {
const client = new Ably.Realtime(options);

client.connection.on((stateChange) => {
console.warn(`[Pool conn ${i}] ${stateChange.previous} → ${stateChange.current}`);
if (stateChange.current === 'failed') {
this._replaceConnection(i, options);
}
});

return client;
});

this._channelMap = new Map();
this._options = options;
}

_replaceConnection(index) {
const oldClient = this._connections[index];

const affectedChannels = [];
for (const [channelName, connIndex] of this._channelMap) {
if (connIndex === index) affectedChannels.push(channelName);
}

try { oldClient.close(); } catch (e) { /* already dead */ }

const newClient = new Ably.Realtime(this._options);
newClient.connection.on((stateChange) => {
console.warn(`[Pool conn ${index}] ${stateChange.previous} → ${stateChange.current}`);
if (stateChange.current === 'failed') {
this._replaceConnection(index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a connection fails, there is probably a network issue, and creating a new instance seems unlikely to recover the situation. (Also, in this case, if the new client's connection enter the failed state as a result, this might overflow the call stack?)

}
});
this._connections[index] = newClient;

console.warn(`[Pool] Replaced conn ${index}, ${affectedChannels.length} channels affected`);
}

_hashCode(str) {
let hash = 0;
for (let i = 0; i < str.length; i++) {
hash = ((hash << 5) - hash + str.charCodeAt(i)) | 0;
}
return Math.abs(hash);
}

getClient(channelName) {
const index = this._hashCode(channelName) % this._connections.length;
this._channelMap.set(channelName, index);
return this._connections[index];
}

getChannel(channelName, channelOptions) {
const client = this.getClient(channelName);
return client.channels.get(channelName, channelOptions);
}

releaseChannel(channelName) {
const index = this._channelMap.get(channelName);
if (index !== undefined) {
const client = this._connections[index];
client.channels.release(channelName);
this._channelMap.delete(channelName);
}
}

async close() {
const detachPromises = [];
for (const [channelName, index] of this._channelMap) {
const client = this._connections[index];
const channel = client.channels.get(channelName);
detachPromises.push(channel.detach().catch(() => {}));
}
await Promise.allSettled(detachPromises);
this._channelMap.clear();

for (const client of this._connections) {
client.close();
}
}

getStatus() {
return this._connections.map((client, i) => ({
index: i,
state: client.connection.state,
channels: [...this._channelMap.entries()]
.filter(([, idx]) => idx === i)
.map(([name]) => name),
}));
}
}

const pool = new AblyConnectionPool(10, {
key: 'your-api-key',
echoMessages: false,
transportParams: { appendRollupWindow: 100 },
});

// Publish a streaming response
const channel = pool.getChannel('ai:session-123');

// Ensure channel rule has "Message annotations, updates, deletes and appends" enabled
// for the ai:* namespace in your Ably dashboard
const { serials: [msgSerial] } = await channel.publish({ name: 'response', data: '' });

for await (const event of stream) {
if (event.type === 'token') {
channel.appendMessage({ serial: msgSerial, data: event.text });
}
}

// When session ends
pool.releaseChannel('ai:session-123');

// On server shutdown
await pool.close();
```
</Code>

### Size the connection pool <a id="sizing"/>

The number of connections you need depends on your peak concurrent streams and your per-stream message rate.

For [message per response](/docs/ai-transport/token-streaming/message-per-response), the rollup window controls per-stream message rate:

| `appendRollupWindow` | Streams per connection | Streams per 5 conns | Streams per 10 conns | Streams per 20 conns |
| --- | --- | --- | --- | --- |
| 40ms *(default)* | 2 | 10 | 20 | 40 |
| 100ms | 5 | 25 | 50 | 100 |
| 200ms | 10 | 50 | 100 | 200 |

For [message per token](/docs/ai-transport/token-streaming/message-per-token), divide your [connection inbound message rate](/docs/platform/pricing/limits#connection) by the model's peak output rate to determine how many streams fit per connection. If a model outputs 50 tokens per second, a single stream saturates one connection.