Skip to content

Commit 07365cd

Browse files
authored
realtime: fix streams missing chunks when streams are longer than 5 minutes and receive 408 Request Timeout errors. Also now support multiple client streams being sent to a single stream key (#1993)
* realtime: fix streams missing chunks when streams are longer than 5 minutes and receive 408 Request Timeout errors. Also now support multiple client streams being sent to a single stream key * Safely release the reader lock
1 parent 83d0e87 commit 07365cd

File tree

3 files changed

+29
-14
lines changed

3 files changed

+29
-14
lines changed

.changeset/gentle-waves-suffer.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Fixed an issue with realtime streams that timeout and resume streaming dropping chunks

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,19 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
5050
if (messages && messages.length > 0) {
5151
const [_key, entries] = messages[0];
5252

53-
for (const [id, fields] of entries) {
53+
for (let i = 0; i < entries.length; i++) {
54+
const [id, fields] = entries[i];
5455
lastId = id;
5556

5657
if (fields && fields.length >= 2) {
57-
if (fields[1] === END_SENTINEL) {
58+
if (fields[1] === END_SENTINEL && i === entries.length - 1) {
5859
controller.close();
5960
return;
6061
}
61-
controller.enqueue(fields[1]);
62+
63+
if (fields[1] !== END_SENTINEL) {
64+
controller.enqueue(fields[1]);
65+
}
6266

6367
if (signal.aborted) {
6468
controller.close();

packages/core/src/v3/runMetadata/metadataStream.ts

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@ export class MetadataStream<T> {
2222
private retryCount = 0;
2323
private readonly maxRetries: number;
2424
private currentChunkIndex = 0;
25-
private reader: ReadableStreamDefaultReader<T>;
2625

2726
constructor(private options: MetadataOptions<T>) {
2827
const [serverStream, consumerStream] = this.createTeeStreams();
2928
this.serverStream = serverStream;
3029
this.consumerStream = consumerStream;
3130
this.maxRetries = options.maxRetries ?? 10;
32-
this.reader = this.serverStream.getReader();
3331

3432
this.streamPromise = this.initializeServerStream();
3533
}
@@ -52,6 +50,8 @@ export class MetadataStream<T> {
5250
}
5351

5452
private async makeRequest(startFromChunk: number = 0): Promise<void> {
53+
const reader = this.serverStream.getReader();
54+
5555
return new Promise((resolve, reject) => {
5656
const url = new URL(this.buildUrl());
5757
const timeout = 15 * 60 * 1000; // 15 minutes
@@ -71,15 +71,20 @@ export class MetadataStream<T> {
7171
});
7272

7373
req.on("error", (error) => {
74+
safeReleaseLock(reader);
7475
reject(error);
7576
});
7677

7778
req.on("timeout", () => {
79+
safeReleaseLock(reader);
80+
7881
req.destroy(new Error("Request timed out"));
7982
});
8083

8184
req.on("response", (res) => {
8285
if (res.statusCode === 408) {
86+
safeReleaseLock(reader);
87+
8388
if (this.retryCount < this.maxRetries) {
8489
this.retryCount++;
8590

@@ -112,7 +117,7 @@ export class MetadataStream<T> {
112117
const processStream = async () => {
113118
try {
114119
while (true) {
115-
const { done, value } = await this.reader.read();
120+
const { done, value } = await reader.read();
116121

117122
if (done) {
118123
req.end();
@@ -124,7 +129,7 @@ export class MetadataStream<T> {
124129
this.currentChunkIndex++;
125130
}
126131
} catch (error) {
127-
req.destroy(error as Error);
132+
reject(error);
128133
}
129134
};
130135

@@ -135,12 +140,7 @@ export class MetadataStream<T> {
135140
}
136141

137142
private async initializeServerStream(): Promise<void> {
138-
try {
139-
await this.makeRequest(0);
140-
} catch (error) {
141-
this.reader.releaseLock();
142-
throw error;
143-
}
143+
await this.makeRequest(0);
144144
}
145145

146146
public async wait(): Promise<void> {
@@ -174,6 +174,12 @@ async function* streamToAsyncIterator<T>(stream: ReadableStream<T>): AsyncIterab
174174
yield value;
175175
}
176176
} finally {
177-
reader.releaseLock();
177+
safeReleaseLock(reader);
178178
}
179179
}
180+
181+
function safeReleaseLock(reader: ReadableStreamDefaultReader<any>) {
182+
try {
183+
reader.releaseLock();
184+
} catch (error) {}
185+
}

0 commit comments

Comments
 (0)