Skip to content

V4 dequeue test fixes #1991

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { expect } from "vitest";
import { RunEngine } from "../index.js";
import { setTimeout } from "node:timers/promises";
import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js";
import { DequeuedMessage } from "@trigger.dev/core/v3";

vi.setConfig({ testTimeout: 60_000 });

Expand Down Expand Up @@ -115,12 +116,17 @@ describe("RunEngine batchTrigger", () => {
expect(queueLength).toBe(2);

//dequeue
const [d1, d2] = await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: run1.masterQueue,
maxRunCount: 10,
});

const dequeued: DequeuedMessage[] = [];
for (let i = 0; i < 2; i++) {
dequeued.push(
...(await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: "main",
maxRunCount: 1,
}))
);
}
const [d1, d2] = dequeued;
//attempts
const attempt1 = await engine.startRunAttempt({
runId: d1.run.id,
Expand Down
182 changes: 96 additions & 86 deletions internal-packages/run-engine/src/engine/tests/dequeuing.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { expect } from "vitest";
import { MinimalAuthenticatedEnvironment } from "../../shared/index.js";
import { RunEngine } from "../index.js";
import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js";
import { DequeuedMessage } from "@trigger.dev/core/v3";

vi.setConfig({ testTimeout: 60_000 });

Expand Down Expand Up @@ -63,106 +64,115 @@ describe("RunEngine dequeuing", () => {
expect(queueLength).toBe(10);

//dequeue
const dequeued = await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: "main",
maxRunCount: 5,
});
const dequeued: DequeuedMessage[] = [];
for (let i = 0; i < 5; i++) {
dequeued.push(
...(await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: "main",
maxRunCount: 1,
}))
);
}

expect(dequeued.length).toBe(5);
} finally {
engine.quit();
}
});

containerTest("Dequeues runs within machine constraints", async ({ prisma, redisOptions }) => {
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
//This will fail until we support dequeuing multiple runs from a single environment
containerTest.fails(
"Dequeues runs within machine constraints",
async ({ prisma, redisOptions }) => {
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");

const engine = new RunEngine({
prisma,
worker: {
redis: redisOptions,
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 100,
},
queue: {
redis: redisOptions,
},
runLock: {
redis: redisOptions,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0005,
},
tracer: trace.getTracer("test", "0.0.0"),
});

try {
const taskIdentifier = "test-task";

//create background worker
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier, {
preset: "small-1x",
});

//trigger the runs
const runs = await triggerRuns({
engine,
environment: authenticatedEnvironment,
taskIdentifier,
const engine = new RunEngine({
prisma,
count: 20,
});
expect(runs.length).toBe(20);

//check the queue length
const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
expect(queueLength).toBe(20);

//dequeue
const dequeued = await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: "main",
maxRunCount: 5,
maxResources: {
cpu: 1.1,
memory: 3.8,
worker: {
redis: redisOptions,
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 100,
},
});
expect(dequeued.length).toBe(2);

//check the queue length
const queueLength2 = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
expect(queueLength2).toBe(18);

const dequeued2 = await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: "main",
maxRunCount: 10,
maxResources: {
cpu: 4.7,
memory: 3.0,
queue: {
redis: redisOptions,
},
runLock: {
redis: redisOptions,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0005,
},
tracer: trace.getTracer("test", "0.0.0"),
});
expect(dequeued2.length).toBe(6);

//check the queue length
const queueLength3 = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
expect(queueLength3).toBe(12);
} finally {
engine.quit();
try {
const taskIdentifier = "test-task";

//create background worker
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier, {
preset: "small-1x",
});

//trigger the runs
const runs = await triggerRuns({
engine,
environment: authenticatedEnvironment,
taskIdentifier,
prisma,
count: 20,
});
expect(runs.length).toBe(20);

//check the queue length
const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
expect(queueLength).toBe(20);

//dequeue
const dequeued = await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: "main",
maxRunCount: 5,
maxResources: {
cpu: 1.1,
memory: 3.8,
},
});
expect(dequeued.length).toBe(2);

//check the queue length
const queueLength2 = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
expect(queueLength2).toBe(18);

const dequeued2 = await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: "main",
maxRunCount: 10,
maxResources: {
cpu: 4.7,
memory: 3.0,
},
});
expect(dequeued2.length).toBe(6);

//check the queue length
const queueLength3 = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
expect(queueLength3).toBe(12);
} finally {
engine.quit();
}
}
});
);
});

async function triggerRuns({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { trace } from "@internal/tracing";
import { RunEngine } from "../index.js";
import { setTimeout } from "timers/promises";
import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js";
import { DequeuedMessage } from "@trigger.dev/core/v3";

vi.setConfig({ testTimeout: 60_000 });

Expand Down Expand Up @@ -100,11 +101,17 @@ describe("RunEngine pending version", () => {
await setupBackgroundWorker(engine, authenticatedEnvironment, ["test-task-other"]);

//dequeuing should fail
const dequeued = await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: run.masterQueue,
maxRunCount: 10,
});

const dequeued: DequeuedMessage[] = [];
for (let i = 0; i < 2; i++) {
dequeued.push(
...(await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: "main",
maxRunCount: 1,
}))
);
}
expect(dequeued.length).toBe(0);

//queue should be empty
Expand Down
32 changes: 21 additions & 11 deletions internal-packages/run-engine/src/engine/tests/priority.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { PrismaClientOrTransaction } from "@trigger.dev/database";
import { MinimalAuthenticatedEnvironment } from "../../shared/index.js";
import { setTimeout } from "timers/promises";
import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js";
import { DequeuedMessage } from "@trigger.dev/core/v3";

vi.setConfig({ testTimeout: 60_000 });

Expand Down Expand Up @@ -76,12 +77,16 @@ describe("RunEngine priority", () => {
const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
expect(queueLength).toBe(priorities.length);

//dequeue (expect 4 items because of the negative priority)
const dequeue = await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: "main",
maxRunCount: 20,
});
//dequeue 4 times, in order
const dequeue: DequeuedMessage[] = [];
for (let i = 0; i < 4; i++) {
const items = await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: "main",
maxRunCount: 1,
});
dequeue.push(...items);
}
expect(dequeue.length).toBe(4);
expect(dequeue[0].run.friendlyId).toBe(runs[4].friendlyId);
expect(dequeue[1].run.friendlyId).toBe(runs[3].friendlyId);
Expand Down Expand Up @@ -175,11 +180,16 @@ describe("RunEngine priority", () => {
expect(queueLength).toBe(queueTimestamps.length);

//dequeue (expect 4 items because of the negative priority)
const dequeue = await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: "main",
maxRunCount: 20,
});
const dequeue: DequeuedMessage[] = [];
for (let i = 0; i < 5; i++) {
dequeue.push(
...(await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: "main",
maxRunCount: 1,
}))
);
}
expect(dequeue.length).toBe(queueTimestamps.length);
expect(dequeue[0].run.friendlyId).toBe(runs[2].friendlyId);
expect(dequeue[1].run.friendlyId).toBe(runs[3].friendlyId);
Expand Down
3 changes: 2 additions & 1 deletion internal-packages/run-engine/src/run-queue/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ describe("RunQueue", () => {
}
);

redisTest(
// This test fails now because we only return a single run per env. We will change this in the future.
redisTest.fails(
"Dequeue multiple messages from the queue",
{ timeout: 5_000 },
async ({ redisContainer }) => {
Expand Down