-
-
Notifications
You must be signed in to change notification settings - Fork 724
fix: moved dequeuing from a worker queue out of lua to be able to use blpop #2151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
WalkthroughThe changes introduce a new public asynchronous method, ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
internal-packages/run-engine/src/run-queue/index.test.ts
(3 hunks)internal-packages/run-engine/src/run-queue/index.ts
(3 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
internal-packages/run-engine/src/run-queue/index.test.ts (1)
internal-packages/testcontainers/src/index.ts (1)
redisTest
(167-167)
internal-packages/run-engine/src/run-queue/index.ts (3)
internal-packages/run-engine/src/run-queue/keyProducer.ts (2)
messageKey
(184-188)workerQueueKey
(40-42)apps/webapp/app/v3/marqs/index.server.ts (6)
message
(453-459)message
(461-467)message
(843-847)message
(1057-1257)message
(1321-1389)message
(1391-1450)internal-packages/run-engine/src/run-queue/types.ts (2)
OutputPayload
(31-31)OutputPayload
(33-33)
⏰ Context from checks skipped due to timeout of 90000ms (25)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (10, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (9, 10)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 10)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (4)
internal-packages/run-engine/src/run-queue/index.ts (3)
441-472
: LGTM! Clean implementation of message retrieval by key.The new
readMessageFromKey
method follows the existing patterns and properly handles parsing errors with appropriate logging.
1445-1448
: Good refactoring to use native Redis BLPOP.The switch from a custom Lua script to Redis's native
BLPOP
command simplifies the implementation and leverages Redis's built-in blocking behavior. The configurable timeout is a nice touch.
1648-1652
: Clean simplification after removing Lua commands.The method is now focused solely on creating a duplicate Redis client, which aligns perfectly with the refactored approach using native Redis commands.
internal-packages/run-engine/src/run-queue/index.test.ts (1)
581-581
: LGTM! Test cleanup aligns with the refactored implementation.Removing explicit timeout options from the
redisTest
calls is appropriate since the blocking behavior is now handled by the native Redis BLPOP command with its own timeout configuration.Also applies to: 662-662, 809-809
cleanup().then(() => { | ||
this.logger.debug("dequeueMessageFromWorkerQueue cleanup", { | ||
service: this.name, | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider awaiting the cleanup or handling potential errors.
The async cleanup of the blocking client doesn't await the promise. If cleanup()
fails, the error won't be caught and could lead to unhandled promise rejections.
Consider either awaiting the cleanup:
- cleanup().then(() => {
- this.logger.debug("dequeueMessageFromWorkerQueue cleanup", {
- service: this.name,
- });
- });
+ await cleanup().then(() => {
+ this.logger.debug("dequeueMessageFromWorkerQueue cleanup", {
+ service: this.name,
+ });
+ }).catch((error) => {
+ this.logger.error("dequeueMessageFromWorkerQueue cleanup failed", {
+ error,
+ service: this.name,
+ });
+ });
Or at least add error handling:
cleanup().then(() => {
this.logger.debug("dequeueMessageFromWorkerQueue cleanup", {
service: this.name,
});
+ }).catch((error) => {
+ this.logger.error("dequeueMessageFromWorkerQueue cleanup failed", {
+ error,
+ service: this.name,
+ });
});
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
cleanup().then(() => { | |
this.logger.debug("dequeueMessageFromWorkerQueue cleanup", { | |
service: this.name, | |
}); | |
}); | |
await cleanup().then(() => { | |
this.logger.debug("dequeueMessageFromWorkerQueue cleanup", { | |
service: this.name, | |
}); | |
}).catch((error) => { | |
this.logger.error("dequeueMessageFromWorkerQueue cleanup failed", { | |
error, | |
service: this.name, | |
}); | |
}); |
🤖 Prompt for AI Agents
In internal-packages/run-engine/src/run-queue/index.ts around lines 1452 to
1456, the cleanup() async call is not awaited, which risks unhandled promise
rejections if cleanup fails. Modify the code to either await the cleanup() call
to ensure it completes before proceeding or add a .catch() handler to log or
handle any errors from cleanup(). This will properly handle potential errors and
avoid unhandled promise rejections.
No description provided.