Skip to content

Commit f579afb

Browse files
authored
V4 dequeue performance (return faster) (#1989)
* Loop through tenants only once, return as soon as we have enough. * Hello world deadlock task set back to triggerAndWait
1 parent 3d744c6 commit f579afb

File tree

6 files changed

+29
-71
lines changed

6 files changed

+29
-71
lines changed

apps/webapp/app/env.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,6 @@ const EnvironmentSchema = z.object({
460460
RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
461461
RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),
462462
RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
463-
RUN_ENGINE_MAX_DEQUEUE_LOOP_ATTEMPTS: z.coerce.number().int().default(10),
464463

465464
RUN_ENGINE_WORKER_REDIS_HOST: z
466465
.string()

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ function createRunEngine() {
5858
maximumEnvCount: env.RUN_ENGINE_MAXIMUM_ENV_COUNT,
5959
tracer,
6060
},
61-
maxDequeueLoopAttempts: env.RUN_ENGINE_MAX_DEQUEUE_LOOP_ATTEMPTS,
6261
},
6362
runLock: {
6463
redis: {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ export class RunEngine {
113113
logger: new Logger("RunQueue", "debug"),
114114
redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` },
115115
retryOptions: options.queue?.retryOptions,
116-
maxDequeueLoopAttempts: options.queue?.maxDequeueLoopAttempts ?? 10,
117116
});
118117

119118
this.worker = new Worker({

internal-packages/run-engine/src/engine/types.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ export type RunEngineOptions = {
3535
FairQueueSelectionStrategyOptions,
3636
"parentQueueLimit" | "tracer" | "biases" | "reuseSnapshotCount" | "maximumEnvCount"
3737
>;
38-
maxDequeueLoopAttempts?: number;
3938
};
4039
runLock: {
4140
redis: RedisOptions;

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 23 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import {
3030
type Result,
3131
} from "@internal/redis";
3232
import { MessageNotFoundError } from "./errors.js";
33+
import { tryCatch } from "@trigger.dev/core";
3334

3435
const SemanticAttributes = {
3536
QUEUE: "runqueue.queue",
@@ -51,7 +52,6 @@ export type RunQueueOptions = {
5152
verbose?: boolean;
5253
logger?: Logger;
5354
retryOptions?: RetryOptions;
54-
maxDequeueLoopAttempts?: number;
5555
};
5656

5757
type DequeuedMessage = {
@@ -78,7 +78,6 @@ export class RunQueue {
7878
private redis: Redis;
7979
public keys: RunQueueKeyProducer;
8080
private queueSelectionStrategy: RunQueueSelectionStrategy;
81-
private maxDequeueLoopAttempts: number;
8281

8382
constructor(private readonly options: RunQueueOptions) {
8483
this.retryOptions = options.retryOptions ?? defaultRetrySettings;
@@ -94,7 +93,6 @@ export class RunQueue {
9493

9594
this.keys = options.keys;
9695
this.queueSelectionStrategy = options.queueSelectionStrategy;
97-
this.maxDequeueLoopAttempts = options.maxDequeueLoopAttempts ?? 10;
9896

9997
this.subscriber = createRedisClient(options.redis, {
10098
onError: (error) => {
@@ -396,55 +394,45 @@ export class RunQueue {
396394

397395
let attemptedEnvs = 0;
398396
let attemptedQueues = 0;
399-
let dequeueLoopAttempts = 0;
400397

401398
const messages: DequeuedMessage[] = [];
402399

403-
// Each env starts with its list of candidate queues
404-
const tenantQueues: Record<string, string[]> = {};
405-
406-
// Initialize tenantQueues with the queues for each env
407400
for (const env of envQueues) {
408-
tenantQueues[env.envId] = [...env.queues]; // Create a copy of the queues array
409-
}
410-
411-
// Continue until we've hit max count or all tenants have empty queue lists
412-
while (
413-
messages.length < maxCount &&
414-
Object.values(tenantQueues).some((queues) => queues.length > 0) &&
415-
dequeueLoopAttempts < this.maxDequeueLoopAttempts
416-
) {
417-
dequeueLoopAttempts++;
401+
attemptedEnvs++;
418402

419-
for (const env of envQueues) {
420-
attemptedEnvs++;
421-
422-
// Skip if this tenant has no more queues
423-
if (tenantQueues[env.envId].length === 0) {
424-
continue;
425-
}
426-
427-
// Pop the next queue (using round-robin order)
428-
const queue = tenantQueues[env.envId].shift()!;
403+
for (const queue of env.queues) {
429404
attemptedQueues++;
430405

431406
// Attempt to dequeue from this queue
432-
const message = await this.#callDequeueMessage({
433-
messageQueue: queue,
434-
});
407+
const [error, message] = await tryCatch(
408+
this.#callDequeueMessage({
409+
messageQueue: queue,
410+
})
411+
);
412+
413+
if (error) {
414+
this.logger.error(
415+
`[dequeueMessageInSharedQueue][${this.name}] Failed to dequeue from queue ${queue}`,
416+
{
417+
error,
418+
}
419+
);
420+
}
435421

436422
if (message) {
437423
messages.push(message);
438-
// Re-add this queue at the end, since it might have more messages
439-
tenantQueues[env.envId].push(queue);
440424
}
441-
// If message is null, do not re-add the queue in this cycle
442425

443-
// If we've reached maxCount, break out of the loop
426+
// If we've reached maxCount, we don't want to look at this env anymore
444427
if (messages.length >= maxCount) {
445428
break;
446429
}
447430
}
431+
432+
// If we've reached maxCount, we're completely done
433+
if (messages.length >= maxCount) {
434+
break;
435+
}
448436
}
449437

450438
span.setAttributes({

references/hello-world/src/trigger/deadlocks.ts

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,45 +14,19 @@ export const deadlockReleasingQueue = queue({
1414
export const deadlockTester = task({
1515
id: "deadlock-tester",
1616
run: async (payload: any, { ctx }) => {
17-
// await deadlockNestedTask.triggerAndWait({
18-
// message: "Hello, world!",
19-
// });
20-
21-
await deadlockNestedTask.batchTriggerAndWait([
22-
{
23-
payload: {
24-
message: "Hello, world!",
25-
},
26-
},
27-
{
28-
payload: {
29-
message: "Hello, world!",
30-
},
31-
},
32-
]);
17+
await deadlockNestedTask.triggerAndWait({
18+
message: "Hello, world!",
19+
});
3320
},
3421
});
3522

3623
export const deadlockNestedTask = task({
3724
id: "deadlock-nested-task",
3825
queue: deadlockQueue,
3926
run: async (payload: any, { ctx }) => {
40-
// await deadlockTester.triggerAndWait({
41-
// message: "Hello, world!",
42-
// });
43-
44-
await deadlockTester.batchTriggerAndWait([
45-
{
46-
payload: {
47-
message: "Hello, world!",
48-
},
49-
},
50-
{
51-
payload: {
52-
message: "Hello, world!",
53-
},
54-
},
55-
]);
27+
await deadlockTester.triggerAndWait({
28+
message: "Hello, world!",
29+
});
5630

5731
return {
5832
message: "Hello, world!",

0 commit comments

Comments
 (0)