Skip to content

Commit 575413c

Browse files
authored
fix: moved dequeuing from a worker queue out of lua to be able to use blpop (#2151)
1 parent e7795a0 commit 575413c

File tree

2 files changed

+47
-62
lines changed

2 files changed

+47
-62
lines changed

internal-packages/run-engine/src/run-queue/index.test.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -578,7 +578,7 @@ describe("RunQueue", () => {
578578
}
579579
});
580580

581-
redisTest("Acking", { timeout: 5_000 }, async ({ redisContainer, redisOptions }) => {
581+
redisTest("Acking", async ({ redisContainer, redisOptions }) => {
582582
const queue = new RunQueue({
583583
...testOptions,
584584
queueSelectionStrategy: new FairQueueSelectionStrategy({
@@ -659,7 +659,7 @@ describe("RunQueue", () => {
659659
}
660660
});
661661

662-
redisTest("Ack (before dequeue)", { timeout: 5_000 }, async ({ redisContainer }) => {
662+
redisTest("Ack (before dequeue)", async ({ redisContainer }) => {
663663
const queue = new RunQueue({
664664
...testOptions,
665665
queueSelectionStrategy: new FairQueueSelectionStrategy({
@@ -718,7 +718,6 @@ describe("RunQueue", () => {
718718

719719
redisTest(
720720
"Ack after moving to workerQueue with removeFromWorkerQueue = undefined",
721-
{ timeout: 5_000 },
722721
async ({ redisContainer }) => {
723722
const queue = new RunQueue({
724723
...testOptions,
@@ -763,7 +762,6 @@ describe("RunQueue", () => {
763762

764763
redisTest(
765764
"Ack after moving to workerQueue with removeFromWorkerQueue = true",
766-
{ timeout: 5_000 },
767765
async ({ redisContainer }) => {
768766
const queue = new RunQueue({
769767
...testOptions,
@@ -808,7 +806,7 @@ describe("RunQueue", () => {
808806
}
809807
);
810808

811-
redisTest("Nacking", { timeout: 15_000 }, async ({ redisContainer, redisOptions }) => {
809+
redisTest("Nacking", async ({ redisContainer, redisOptions }) => {
812810
const queue = new RunQueue({
813811
...testOptions,
814812
queueSelectionStrategy: new FairQueueSelectionStrategy({

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 44 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,39 @@ export class RunQueue {
438438
);
439439
}
440440

441+
public async readMessageFromKey(messageKey: string) {
442+
return this.#trace(
443+
"readMessageFromKey",
444+
async (span) => {
445+
const rawMessage = await this.redis.get(messageKey);
446+
447+
if (!rawMessage) {
448+
return;
449+
}
450+
451+
const message = OutputPayload.safeParse(JSON.parse(rawMessage));
452+
453+
if (!message.success) {
454+
this.logger.error(`[${this.name}] Failed to parse message`, {
455+
messageKey,
456+
error: message.error,
457+
service: this.name,
458+
});
459+
460+
return;
461+
}
462+
463+
return message.data;
464+
},
465+
{
466+
attributes: {
467+
[SEMATTRS_MESSAGING_OPERATION]: "receive",
468+
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
469+
},
470+
}
471+
);
472+
}
473+
441474
public async enqueueMessage({
442475
env,
443476
message,
@@ -1409,17 +1442,18 @@ export class RunQueue {
14091442

14101443
this.abortController.signal.addEventListener("abort", cleanup);
14111444

1412-
const result = await blockingClient.dequeueMessageFromWorkerQueue(
1413-
//keys
1445+
const result = await blockingClient.blpop(
14141446
workerQueueKey,
1415-
//args
1416-
this.options.redis.keyPrefix ?? "",
1417-
String(this.options.dequeueBlockingTimeoutSeconds ?? 10)
1447+
this.options.dequeueBlockingTimeoutSeconds ?? 10
14181448
);
14191449

14201450
this.abortController.signal.removeEventListener("abort", cleanup);
14211451

1422-
await cleanup();
1452+
cleanup().then(() => {
1453+
this.logger.debug("dequeueMessageFromWorkerQueue cleanup", {
1454+
service: this.name,
1455+
});
1456+
});
14231457

14241458
if (!result) {
14251459
return;
@@ -1447,24 +1481,16 @@ export class RunQueue {
14471481
return;
14481482
}
14491483

1450-
const [messageId, rawMessage] = result;
1484+
const [, messageKey] = result;
14511485

1452-
//read message
1453-
const parsedMessage = OutputPayload.safeParse(JSON.parse(rawMessage));
1454-
if (!parsedMessage.success) {
1455-
this.logger.error(`[${this.name}] Failed to parse message`, {
1456-
messageId,
1457-
error: parsedMessage.error,
1458-
service: this.name,
1459-
});
1486+
const message = await this.readMessageFromKey(messageKey);
14601487

1488+
if (!message) {
14611489
return;
14621490
}
14631491

1464-
const message = parsedMessage.data;
1465-
14661492
return {
1467-
messageId,
1493+
messageId: message.runId,
14681494
messageScore: String(message.timestamp),
14691495
message,
14701496
};
@@ -1622,45 +1648,6 @@ export class RunQueue {
16221648
#createBlockingDequeueClient() {
16231649
const blockingClient = this.redis.duplicate();
16241650

1625-
blockingClient.defineCommand("dequeueMessageFromWorkerQueue", {
1626-
numberOfKeys: 1,
1627-
lua: `
1628-
local workerQueueKey = KEYS[1]
1629-
1630-
local keyPrefix = ARGV[1]
1631-
local timeoutInSeconds = tonumber(ARGV[2])
1632-
1633-
-- Attempt to dequeue using BLPOP
1634-
-- result is either nil or [queueName, messageId]
1635-
local result = redis.call('BLPOP', workerQueueKey, timeoutInSeconds)
1636-
1637-
if not result or type(result) ~= "table" then
1638-
return nil
1639-
end
1640-
1641-
local messageKeyValue = result[2]
1642-
1643-
-- Get the message payload
1644-
local messageKey = keyPrefix .. messageKeyValue
1645-
1646-
local messagePayload = redis.call('GET', messageKey)
1647-
1648-
-- if the messagePayload is nil, then the message is not in the queue
1649-
if not messagePayload then
1650-
return nil
1651-
end
1652-
1653-
-- messageKeyValue is {org:<orgId>}:message:<messageId> and we want to extract the messageId
1654-
local messageId = messageKeyValue:match("([^:]+)$")
1655-
1656-
if not messageId then
1657-
return nil
1658-
end
1659-
1660-
return {messageId, messagePayload} -- Return message details
1661-
`,
1662-
});
1663-
16641651
return blockingClient;
16651652
}
16661653

0 commit comments

Comments
 (0)