Skip to content

Commit 059c3a6

Browse files
committed
realtime: fix streams missing chunks when streams are longer than 5 minutes and receive 408 Request Timeout errors. Also now support multiple client streams being sent to a single stream key
1 parent f579afb commit 059c3a6

File tree

3 files changed

+21
-13
lines changed

3 files changed

+21
-13
lines changed

.changeset/gentle-waves-suffer.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Fixed an issue with realtime streams that timeout and resume streaming dropping chunks

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,19 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
5050
if (messages && messages.length > 0) {
5151
const [_key, entries] = messages[0];
5252

53-
for (const [id, fields] of entries) {
53+
for (let i = 0; i < entries.length; i++) {
54+
const [id, fields] = entries[i];
5455
lastId = id;
5556

5657
if (fields && fields.length >= 2) {
57-
if (fields[1] === END_SENTINEL) {
58+
if (fields[1] === END_SENTINEL && i === entries.length - 1) {
5859
controller.close();
5960
return;
6061
}
61-
controller.enqueue(fields[1]);
62+
63+
if (fields[1] !== END_SENTINEL) {
64+
controller.enqueue(fields[1]);
65+
}
6266

6367
if (signal.aborted) {
6468
controller.close();

packages/core/src/v3/runMetadata/metadataStream.ts

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@ export class MetadataStream<T> {
2222
private retryCount = 0;
2323
private readonly maxRetries: number;
2424
private currentChunkIndex = 0;
25-
private reader: ReadableStreamDefaultReader<T>;
2625

2726
constructor(private options: MetadataOptions<T>) {
2827
const [serverStream, consumerStream] = this.createTeeStreams();
2928
this.serverStream = serverStream;
3029
this.consumerStream = consumerStream;
3130
this.maxRetries = options.maxRetries ?? 10;
32-
this.reader = this.serverStream.getReader();
3331

3432
this.streamPromise = this.initializeServerStream();
3533
}
@@ -52,6 +50,8 @@ export class MetadataStream<T> {
5250
}
5351

5452
private async makeRequest(startFromChunk: number = 0): Promise<void> {
53+
const reader = this.serverStream.getReader();
54+
5555
return new Promise((resolve, reject) => {
5656
const url = new URL(this.buildUrl());
5757
const timeout = 15 * 60 * 1000; // 15 minutes
@@ -71,15 +71,19 @@ export class MetadataStream<T> {
7171
});
7272

7373
req.on("error", (error) => {
74+
reader.releaseLock();
7475
reject(error);
7576
});
7677

7778
req.on("timeout", () => {
79+
reader.releaseLock();
7880
req.destroy(new Error("Request timed out"));
7981
});
8082

8183
req.on("response", (res) => {
8284
if (res.statusCode === 408) {
85+
reader.releaseLock();
86+
8387
if (this.retryCount < this.maxRetries) {
8488
this.retryCount++;
8589

@@ -112,7 +116,7 @@ export class MetadataStream<T> {
112116
const processStream = async () => {
113117
try {
114118
while (true) {
115-
const { done, value } = await this.reader.read();
119+
const { done, value } = await reader.read();
116120

117121
if (done) {
118122
req.end();
@@ -124,7 +128,7 @@ export class MetadataStream<T> {
124128
this.currentChunkIndex++;
125129
}
126130
} catch (error) {
127-
req.destroy(error as Error);
131+
reject(error);
128132
}
129133
};
130134

@@ -135,12 +139,7 @@ export class MetadataStream<T> {
135139
}
136140

137141
private async initializeServerStream(): Promise<void> {
138-
try {
139-
await this.makeRequest(0);
140-
} catch (error) {
141-
this.reader.releaseLock();
142-
throw error;
143-
}
142+
await this.makeRequest(0);
144143
}
145144

146145
public async wait(): Promise<void> {

0 commit comments

Comments
 (0)