Skip to content

Commit 44308f1

Browse files
committed
fix: realtime streams from tasks now retry if they receive a 408 timeout error from the realtime/trigger.dev server
1 parent 006951a commit 44308f1

File tree

2 files changed

+113
-21
lines changed

2 files changed

+113
-21
lines changed

.changeset/hip-cups-wave.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Fix issue where realtime streams would cut off after 5 minutes

packages/core/src/v3/runMetadata/metadataStream.ts

Lines changed: 108 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import { request as httpsRequest } from "node:https";
2+
import { request as httpRequest } from "node:http";
3+
import { URL } from "node:url";
4+
15
export type MetadataOptions<T> = {
26
baseUrl: string;
37
runId: string;
@@ -7,57 +11,140 @@ export type MetadataOptions<T> = {
711
signal?: AbortSignal;
812
version?: "v1" | "v2";
913
target?: "self" | "parent" | "root";
14+
maxRetries?: number;
1015
};
1116

1217
export class MetadataStream<T> {
1318
private controller = new AbortController();
1419
private serverStream: ReadableStream<T>;
1520
private consumerStream: ReadableStream<T>;
16-
private streamPromise: Promise<void | Response>;
21+
private streamPromise: Promise<void>;
22+
private retryCount = 0;
23+
private readonly maxRetries: number;
24+
private currentChunkIndex = 0;
25+
private reader: ReadableStreamDefaultReader<T>;
1726

1827
constructor(private options: MetadataOptions<T>) {
1928
const [serverStream, consumerStream] = this.createTeeStreams();
2029
this.serverStream = serverStream;
2130
this.consumerStream = consumerStream;
31+
this.maxRetries = options.maxRetries ?? 10;
32+
this.reader = this.serverStream.getReader();
2233

2334
this.streamPromise = this.initializeServerStream();
2435
}
2536

2637
private createTeeStreams() {
2738
const readableSource = new ReadableStream<T>({
2839
start: async (controller) => {
29-
for await (const value of this.options.source) {
30-
controller.enqueue(value);
40+
try {
41+
for await (const value of this.options.source) {
42+
controller.enqueue(value);
43+
}
44+
controller.close();
45+
} catch (error) {
46+
controller.error(error);
3147
}
32-
33-
controller.close();
3448
},
3549
});
3650

3751
return readableSource.tee();
3852
}
3953

40-
private initializeServerStream(): Promise<Response> {
41-
const serverStream = this.serverStream.pipeThrough(
42-
new TransformStream<T, string>({
43-
async transform(chunk, controller) {
44-
controller.enqueue(JSON.stringify(chunk) + "\n");
54+
private async makeRequest(startFromChunk: number = 0): Promise<void> {
55+
return new Promise((resolve, reject) => {
56+
const url = new URL(this.buildUrl());
57+
const timeout = 15 * 60 * 1000; // 15 minutes
58+
59+
const requestFn = url.protocol === "https:" ? httpsRequest : httpRequest;
60+
const req = requestFn({
61+
method: "POST",
62+
hostname: url.hostname,
63+
port: url.port || (url.protocol === "https:" ? 443 : 80),
64+
path: url.pathname + url.search,
65+
headers: {
66+
...this.options.headers,
67+
"Content-Type": "application/json",
68+
"X-Resume-From-Chunk": startFromChunk.toString(),
4569
},
46-
})
47-
);
48-
49-
return fetch(this.buildUrl(), {
50-
method: "POST",
51-
headers: this.options.headers ?? {},
52-
body: serverStream,
53-
signal: this.controller.signal,
54-
// @ts-expect-error
55-
duplex: "half",
70+
timeout,
71+
});
72+
73+
req.on("error", (error) => {
74+
reject(error);
75+
});
76+
77+
req.on("timeout", () => {
78+
req.destroy(new Error("Request timed out"));
79+
});
80+
81+
req.on("response", (res) => {
82+
if (res.statusCode === 408) {
83+
if (this.retryCount < this.maxRetries) {
84+
this.retryCount++;
85+
86+
resolve(this.makeRequest(this.currentChunkIndex));
87+
return;
88+
}
89+
reject(new Error(`Max retries (${this.maxRetries}) exceeded after timeout`));
90+
return;
91+
}
92+
93+
if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) {
94+
const error = new Error(`HTTP error! status: ${res.statusCode}`);
95+
reject(error);
96+
return;
97+
}
98+
99+
res.on("end", () => {
100+
resolve();
101+
});
102+
103+
res.resume();
104+
});
105+
106+
if (this.options.signal) {
107+
this.options.signal.addEventListener("abort", () => {
108+
req.destroy(new Error("Request aborted"));
109+
});
110+
}
111+
112+
const processStream = async () => {
113+
try {
114+
while (true) {
115+
const { done, value } = await this.reader.read();
116+
117+
if (done) {
118+
req.end();
119+
break;
120+
}
121+
122+
const stringified = JSON.stringify(value) + "\n";
123+
req.write(stringified);
124+
this.currentChunkIndex++;
125+
}
126+
} catch (error) {
127+
req.destroy(error as Error);
128+
}
129+
};
130+
131+
processStream().catch((error) => {
132+
reject(error);
133+
});
56134
});
57135
}
58136

137+
private async initializeServerStream(): Promise<void> {
138+
try {
139+
await this.makeRequest(0);
140+
} catch (error) {
141+
this.reader.releaseLock();
142+
throw error;
143+
}
144+
}
145+
59146
public async wait(): Promise<void> {
60-
return this.streamPromise.then(() => void 0);
147+
return this.streamPromise;
61148
}
62149

63150
public [Symbol.asyncIterator]() {

0 commit comments

Comments
 (0)