Skip to content

fix: moved dequeuing from a worker queue out of lua to be able to use blpop #2151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions internal-packages/run-engine/src/run-queue/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ describe("RunQueue", () => {
}
});

redisTest("Acking", { timeout: 5_000 }, async ({ redisContainer, redisOptions }) => {
redisTest("Acking", async ({ redisContainer, redisOptions }) => {
const queue = new RunQueue({
...testOptions,
queueSelectionStrategy: new FairQueueSelectionStrategy({
Expand Down Expand Up @@ -659,7 +659,7 @@ describe("RunQueue", () => {
}
});

redisTest("Ack (before dequeue)", { timeout: 5_000 }, async ({ redisContainer }) => {
redisTest("Ack (before dequeue)", async ({ redisContainer }) => {
const queue = new RunQueue({
...testOptions,
queueSelectionStrategy: new FairQueueSelectionStrategy({
Expand Down Expand Up @@ -718,7 +718,6 @@ describe("RunQueue", () => {

redisTest(
"Ack after moving to workerQueue with removeFromWorkerQueue = undefined",
{ timeout: 5_000 },
async ({ redisContainer }) => {
const queue = new RunQueue({
...testOptions,
Expand Down Expand Up @@ -763,7 +762,6 @@ describe("RunQueue", () => {

redisTest(
"Ack after moving to workerQueue with removeFromWorkerQueue = true",
{ timeout: 5_000 },
async ({ redisContainer }) => {
const queue = new RunQueue({
...testOptions,
Expand Down Expand Up @@ -808,7 +806,7 @@ describe("RunQueue", () => {
}
);

redisTest("Nacking", { timeout: 15_000 }, async ({ redisContainer, redisOptions }) => {
redisTest("Nacking", async ({ redisContainer, redisOptions }) => {
const queue = new RunQueue({
...testOptions,
queueSelectionStrategy: new FairQueueSelectionStrategy({
Expand Down
101 changes: 44 additions & 57 deletions internal-packages/run-engine/src/run-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,39 @@ export class RunQueue {
);
}

public async readMessageFromKey(messageKey: string) {
return this.#trace(
"readMessageFromKey",
async (span) => {
const rawMessage = await this.redis.get(messageKey);

if (!rawMessage) {
return;
}

const message = OutputPayload.safeParse(JSON.parse(rawMessage));

if (!message.success) {
this.logger.error(`[${this.name}] Failed to parse message`, {
messageKey,
error: message.error,
service: this.name,
});

return;
}

return message.data;
},
{
attributes: {
[SEMATTRS_MESSAGING_OPERATION]: "receive",
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
},
}
);
}

public async enqueueMessage({
env,
message,
Expand Down Expand Up @@ -1409,17 +1442,18 @@ export class RunQueue {

this.abortController.signal.addEventListener("abort", cleanup);

const result = await blockingClient.dequeueMessageFromWorkerQueue(
//keys
const result = await blockingClient.blpop(
workerQueueKey,
//args
this.options.redis.keyPrefix ?? "",
String(this.options.dequeueBlockingTimeoutSeconds ?? 10)
this.options.dequeueBlockingTimeoutSeconds ?? 10
);

this.abortController.signal.removeEventListener("abort", cleanup);

await cleanup();
cleanup().then(() => {
this.logger.debug("dequeueMessageFromWorkerQueue cleanup", {
service: this.name,
});
});
Comment on lines +1452 to +1456
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider awaiting the cleanup or handling potential errors.

The async cleanup of the blocking client doesn't await the promise. If cleanup() fails, the error won't be caught and could lead to unhandled promise rejections.

Consider either awaiting the cleanup:

-    cleanup().then(() => {
-      this.logger.debug("dequeueMessageFromWorkerQueue cleanup", {
-        service: this.name,
-      });
-    });
+    await cleanup().then(() => {
+      this.logger.debug("dequeueMessageFromWorkerQueue cleanup", {
+        service: this.name,
+      });
+    }).catch((error) => {
+      this.logger.error("dequeueMessageFromWorkerQueue cleanup failed", {
+        error,
+        service: this.name,
+      });
+    });

Or at least add error handling:

     cleanup().then(() => {
       this.logger.debug("dequeueMessageFromWorkerQueue cleanup", {
         service: this.name,
       });
+    }).catch((error) => {
+      this.logger.error("dequeueMessageFromWorkerQueue cleanup failed", {
+        error,
+        service: this.name,
+      });
     });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
cleanup().then(() => {
this.logger.debug("dequeueMessageFromWorkerQueue cleanup", {
service: this.name,
});
});
await cleanup().then(() => {
this.logger.debug("dequeueMessageFromWorkerQueue cleanup", {
service: this.name,
});
}).catch((error) => {
this.logger.error("dequeueMessageFromWorkerQueue cleanup failed", {
error,
service: this.name,
});
});
🤖 Prompt for AI Agents
In internal-packages/run-engine/src/run-queue/index.ts around lines 1452 to
1456, the cleanup() async call is not awaited, which risks unhandled promise
rejections if cleanup fails. Modify the code to either await the cleanup() call
to ensure it completes before proceeding or add a .catch() handler to log or
handle any errors from cleanup(). This will properly handle potential errors and
avoid unhandled promise rejections.


if (!result) {
return;
Expand Down Expand Up @@ -1447,24 +1481,16 @@ export class RunQueue {
return;
}

const [messageId, rawMessage] = result;
const [, messageKey] = result;

//read message
const parsedMessage = OutputPayload.safeParse(JSON.parse(rawMessage));
if (!parsedMessage.success) {
this.logger.error(`[${this.name}] Failed to parse message`, {
messageId,
error: parsedMessage.error,
service: this.name,
});
const message = await this.readMessageFromKey(messageKey);

if (!message) {
return;
}

const message = parsedMessage.data;

return {
messageId,
messageId: message.runId,
messageScore: String(message.timestamp),
message,
};
Expand Down Expand Up @@ -1622,45 +1648,6 @@ export class RunQueue {
#createBlockingDequeueClient() {
const blockingClient = this.redis.duplicate();

blockingClient.defineCommand("dequeueMessageFromWorkerQueue", {
numberOfKeys: 1,
lua: `
local workerQueueKey = KEYS[1]

local keyPrefix = ARGV[1]
local timeoutInSeconds = tonumber(ARGV[2])

-- Attempt to dequeue using BLPOP
-- result is either nil or [queueName, messageId]
local result = redis.call('BLPOP', workerQueueKey, timeoutInSeconds)

if not result or type(result) ~= "table" then
return nil
end

local messageKeyValue = result[2]

-- Get the message payload
local messageKey = keyPrefix .. messageKeyValue

local messagePayload = redis.call('GET', messageKey)

-- if the messagePayload is nil, then the message is not in the queue
if not messagePayload then
return nil
end

-- messageKeyValue is {org:<orgId>}:message:<messageId> and we want to extract the messageId
local messageId = messageKeyValue:match("([^:]+)$")

if not messageId then
return nil
end

return {messageId, messagePayload} -- Return message details
`,
});

return blockingClient;
}

Expand Down