-
-
Notifications
You must be signed in to change notification settings - Fork 724
fix: realtime streams from tasks now retry if they receive a 408 timeout error from the realtime/trigger.dev server #1952
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…out error from the realtime/trigger.dev server
🦋 Changeset detectedLatest commit: 44308f1 The changes in this PR will be included in the next version bump. Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
WalkthroughThis update refactors the realtime streaming implementation in the Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant MetadataStream
participant Server
Client->>MetadataStream: start streaming (with options)
MetadataStream->>Server: HTTP/HTTPS request (stream chunks)
loop For each chunk
MetadataStream->>Server: Send chunk (JSON)
Server-->>MetadataStream: Ack/continue
end
alt Timeout (HTTP 408)
MetadataStream->>Server: Retry request (resume from last chunk)
end
Server-->>MetadataStream: Stream complete / error
MetadataStream-->>Client: Resolve/reject promise
Possibly related PRs
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (2)
packages/core/src/v3/runMetadata/metadataStream.ts (1)
18-25
:controller
and several fields are never used – consider pruning to keep the class lean.
controller
,serverStream
,consumerStream
andreader
are the only state you actually need for the current implementation.
controller
is instantiated but never referenced, whileretryCount
,maxRetries
,currentChunkIndex
andreader
are all required. Removing dead fields avoids cognitive overhead and keeps the object model simple..changeset/hip-cups-wave.md (1)
5-6
: Consider elaborating the changelog entry for future maintainers.A single‑line summary is terse; adding one or two sentences about the switch to
http/https
native requests and the retry mechanism will make the historical context clearer when scanning the changelog.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
.changeset/hip-cups-wave.md
(1 hunks)packages/core/src/v3/runMetadata/metadataStream.ts
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
packages/core/src/v3/runMetadata/metadataStream.ts (1)
146-148
: Publicwait()
is clear and concise – nice touch!Providing a single promise that represents the lifecycle of the network request gives callers a simple primitive for
await
‑ing completion.
if (done) { | ||
req.end(); | ||
break; | ||
} | ||
|
||
const stringified = JSON.stringify(value) + "\n"; | ||
req.write(stringified); | ||
this.currentChunkIndex++; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Release reader lock once the final chunk is flushed.
After done
is returned the lock is still active.
This prevents GC of the stream and can block callers that might wish to re‑read.
- if (done) {
- req.end();
- break;
- }
+ if (done) {
+ req.end();
+ this.reader.releaseLock();
+ break;
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if (done) { | |
req.end(); | |
break; | |
} | |
const stringified = JSON.stringify(value) + "\n"; | |
req.write(stringified); | |
this.currentChunkIndex++; | |
} | |
if (done) { | |
req.end(); | |
this.reader.releaseLock(); | |
break; | |
} | |
const stringified = JSON.stringify(value) + "\n"; | |
req.write(stringified); | |
this.currentChunkIndex++; | |
} |
const readableSource = new ReadableStream<T>({ | ||
start: async (controller) => { | ||
for await (const value of this.options.source) { | ||
controller.enqueue(value); | ||
try { | ||
for await (const value of this.options.source) { | ||
controller.enqueue(value); | ||
} | ||
controller.close(); | ||
} catch (error) { | ||
controller.error(error); | ||
} | ||
|
||
controller.close(); | ||
}, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Error‑first propagation is good, but release reader lock on failure.
controller.error(error)
is great for signalling downstream consumers, however the reader lock on this.serverStream
is still held.
If an upstream error is thrown, subsequent retries will fail to acquire the lock, leading to a stalled pipeline.
} catch (error) {
- controller.error(error);
+ controller.error(error);
+ // Free the lock so a retry can acquire it.
+ this.reader?.releaseLock?.();
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
const readableSource = new ReadableStream<T>({ | |
start: async (controller) => { | |
for await (const value of this.options.source) { | |
controller.enqueue(value); | |
try { | |
for await (const value of this.options.source) { | |
controller.enqueue(value); | |
} | |
controller.close(); | |
} catch (error) { | |
controller.error(error); | |
} | |
controller.close(); | |
}, | |
}); | |
const readableSource = new ReadableStream<T>({ | |
start: async (controller) => { | |
try { | |
for await (const value of this.options.source) { | |
controller.enqueue(value); | |
} | |
controller.close(); | |
} catch (error) { | |
controller.error(error); | |
// Free the lock so a retry can acquire it. | |
this.reader?.releaseLock?.(); | |
} | |
}, | |
}); |
private async makeRequest(startFromChunk: number = 0): Promise<void> { | ||
return new Promise((resolve, reject) => { | ||
const url = new URL(this.buildUrl()); | ||
const timeout = 15 * 60 * 1000; // 15 minutes | ||
|
||
const requestFn = url.protocol === "https:" ? httpsRequest : httpRequest; | ||
const req = requestFn({ | ||
method: "POST", | ||
hostname: url.hostname, | ||
port: url.port || (url.protocol === "https:" ? 443 : 80), | ||
path: url.pathname + url.search, | ||
headers: { | ||
...this.options.headers, | ||
"Content-Type": "application/json", | ||
"X-Resume-From-Chunk": startFromChunk.toString(), | ||
}, | ||
}) | ||
); | ||
|
||
return fetch(this.buildUrl(), { | ||
method: "POST", | ||
headers: this.options.headers ?? {}, | ||
body: serverStream, | ||
signal: this.controller.signal, | ||
// @ts-expect-error | ||
duplex: "half", | ||
timeout, | ||
}); | ||
|
||
req.on("error", (error) => { | ||
reject(error); | ||
}); | ||
|
||
req.on("timeout", () => { | ||
req.destroy(new Error("Request timed out")); | ||
}); | ||
|
||
req.on("response", (res) => { | ||
if (res.statusCode === 408) { | ||
if (this.retryCount < this.maxRetries) { | ||
this.retryCount++; | ||
|
||
resolve(this.makeRequest(this.currentChunkIndex)); | ||
return; | ||
} | ||
reject(new Error(`Max retries (${this.maxRetries}) exceeded after timeout`)); | ||
return; | ||
} | ||
|
||
if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) { | ||
const error = new Error(`HTTP error! status: ${res.statusCode}`); | ||
reject(error); | ||
return; | ||
} | ||
|
||
res.on("end", () => { | ||
resolve(); | ||
}); | ||
|
||
res.resume(); | ||
}); | ||
|
||
if (this.options.signal) { | ||
this.options.signal.addEventListener("abort", () => { | ||
req.destroy(new Error("Request aborted")); | ||
}); | ||
} | ||
|
||
const processStream = async () => { | ||
try { | ||
while (true) { | ||
const { done, value } = await this.reader.read(); | ||
|
||
if (done) { | ||
req.end(); | ||
break; | ||
} | ||
|
||
const stringified = JSON.stringify(value) + "\n"; | ||
req.write(stringified); | ||
this.currentChunkIndex++; | ||
} | ||
} catch (error) { | ||
req.destroy(error as Error); | ||
} | ||
}; | ||
|
||
processStream().catch((error) => { | ||
reject(error); | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential socket & memory leak on 408 retry path.
-
When a 408 is received you
resolve(this.makeRequest(...))
but you never:
• destroy the current socket
• drainres
(to prevent ‘socket hang up’ warnings)
• release the reader lock -
makeRequest
is recursively invoked, which builds an ever‑growing call stack if many retries occur.
A simple loop is safer. -
The
currentChunkIndex
is incremented after the chunk is written, so the value passed in"X-Resume-From-Chunk"
on retry points to the next chunk rather than the last confirmed one – the server may miss one chunk.
@@
- if (res.statusCode === 408) {
- if (this.retryCount < this.maxRetries) {
- this.retryCount++;
-
- resolve(this.makeRequest(this.currentChunkIndex));
- return;
- }
- reject(new Error(`Max retries (${this.maxRetries}) exceeded after timeout`));
- return;
- }
+ if (res.statusCode === 408) {
+ res.resume(); // consume data
+ req.destroy(); // close socket
+ this.reader.releaseLock();
+
+ if (this.retryCount < this.maxRetries) {
+ this.retryCount++;
+ // Pass the *last* successful chunk index
+ resolve(this.makeRequest(this.currentChunkIndex - 1));
+ return;
+ }
+ reject(new Error(`Max retries (${this.maxRetries}) exceeded after timeout`));
+ return;
+ }
Optionally, replace the recursive strategy with a while
loop to avoid unbounded stack growth.
Summary by CodeRabbit
Bug Fixes
New Features