-
-
Notifications
You must be signed in to change notification settings - Fork 724
realtime: fix streams missing chunks when streams are longer than 5 minutes and receive 408 Request Timeout errors. Also now support multiple client streams being sent to a single stream key #1993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
🦋 Changeset detectedLatest commit: 9967dc3 The changes in this PR will be included in the next version bump. Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
Caution Review failedThe pull request is closed. WalkthroughThis update addresses issues in the realtime streaming and metadata streaming functionalities. It refines how stream entries are processed and ensures that no data chunks are lost during timeout and resume events. The changes include adjustments to the handling of end-of-stream sentinels in Redis-based streams and a refactor of the Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant MetadataStream
participant ServerStream
Client->>MetadataStream: makeRequest()
MetadataStream->>ServerStream: getReader()
loop For each chunk
ServerStream-->>MetadataStream: chunk
MetadataStream-->>Client: process chunk
end
alt Error/Timeout/HTTP 408
MetadataStream->>ServerStream: releaseReader()
MetadataStream-->>Client: reject promise
end
Possibly related PRs
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (3)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
packages/core/src/v3/runMetadata/metadataStream.ts (1)
118-132
: 🛠️ Refactor suggestionReader lock is never released on normal completion
When the source stream ends (
done === true
) wereq.end()
but do notreleaseLock()
.
That leaves the lock held for the remainder of the process, leaking resources and preventing other readers.if (done) { req.end(); + releaseReader(); // <-- ensure lock is freed break; }
(Assumes the helper
releaseReader()
from the previous comment.)
🧹 Nitpick comments (2)
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)
53-66
: Sentinel handling looks solid – minor readability tweak suggestedGreat catch on ensuring the sentinel only closes the stream when it’s the final entry in the batch; this removes the “early-close” hazard when multiple writers reuse the same key.
For readability / maintainability consider destructuring the Redis field tuple so the intent is explicit:
-const [id, fields] = entries[i]; -… -if (fields && fields.length >= 2) { - if (fields[1] === END_SENTINEL && i === entries.length - 1) { +const [id, [fieldName, fieldValue]] = entries[i]; // ["data", value] +… +if (fieldName === "data") { + if (fieldValue === END_SENTINEL && i === entries.length - 1) { controller.close(); return; } - if (fields[1] !== END_SENTINEL) { - controller.enqueue(fields[1]); + if (fieldValue !== END_SENTINEL) { + controller.enqueue(fieldValue); } }This makes it obvious we’re always working with the
"data"
field and avoids the slightly magicfields[1]
index.
Feel free to keep the current code if you prefer, but the destructure pattern reduces the chance of index mistakes when additional fields are added later..changeset/gentle-waves-suffer.md (1)
1-6
: Polish changelog wording for clarity-Fixed an issue with realtime streams that timeout and resume streaming dropping chunks +Fixed an issue where realtime streams that timed out and then resumed were dropping chunks.Adds past-tense consistency, clarifies subject, and inserts a missing comma.
🧰 Tools
🪛 LanguageTool
[style] ~4-~4: Consider using a different verb for a more formal wording.
Context: --- "@trigger.dev/sdk": patch --- Fixed an issue with realtime streams that tim...(FIX_RESOLVE)
[uncategorized] ~5-~5: Possible missing comma found.
Context: ...ealtime streams that timeout and resume streaming dropping chunks(AI_HYDRA_LEO_MISSING_COMMA)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
.changeset/gentle-waves-suffer.md
(1 hunks)apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
(1 hunks)packages/core/src/v3/runMetadata/metadataStream.ts
(5 hunks)
🧰 Additional context used
🪛 LanguageTool
.changeset/gentle-waves-suffer.md
[style] ~4-~4: Consider using a different verb for a more formal wording.
Context: --- "@trigger.dev/sdk": patch --- Fixed an issue with realtime streams that tim...
(FIX_RESOLVE)
[uncategorized] ~5-~5: Possible missing comma found.
Context: ...ealtime streams that timeout and resume streaming dropping chunks
(AI_HYDRA_LEO_MISSING_COMMA)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
packages/core/src/v3/runMetadata/metadataStream.ts (1)
90-94
: Potential race:processStream()
may reject after promise already resolvedOn a 408 retry path you
resolve(this.makeRequest(...))
butprocessStream
can still throw after the lock is released, invokingreject(error)
on a promise that is already settled.
While silently ignored by Node, it surfaces as an unhandled-rejection warning in strict environments.Consider cancelling the
processStream
loop once you decide to retry (e.g., by breaking the loop after releasing the reader or checkingreaderReleased
), or move the retry logic outside thePromise
constructor to avoid the resolve-then-reject scenario.
…inutes and receive 408 Request Timeout errors. Also now support multiple client streams being sent to a single stream key
Summary by CodeRabbit
Bug Fixes
Refactor