Skip to content

Commit 280a6d0

Browse files
authored
V4 dequeue test fixes (#1991)
* Fix priority tests by dequeuing in sequence * Expect this test to fail until we support parallel dequeuing of an env with streaming * Fix one and expect fail from the other * Fix for batch trigger test dequeuing * Fixed pending version test
1 parent f579afb commit 280a6d0

File tree

5 files changed

+143
-109
lines changed

5 files changed

+143
-109
lines changed

internal-packages/run-engine/src/engine/tests/batchTrigger.test.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { expect } from "vitest";
55
import { RunEngine } from "../index.js";
66
import { setTimeout } from "node:timers/promises";
77
import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js";
8+
import { DequeuedMessage } from "@trigger.dev/core/v3";
89

910
vi.setConfig({ testTimeout: 60_000 });
1011

@@ -115,12 +116,17 @@ describe("RunEngine batchTrigger", () => {
115116
expect(queueLength).toBe(2);
116117

117118
//dequeue
118-
const [d1, d2] = await engine.dequeueFromMasterQueue({
119-
consumerId: "test_12345",
120-
masterQueue: run1.masterQueue,
121-
maxRunCount: 10,
122-
});
123-
119+
const dequeued: DequeuedMessage[] = [];
120+
for (let i = 0; i < 2; i++) {
121+
dequeued.push(
122+
...(await engine.dequeueFromMasterQueue({
123+
consumerId: "test_12345",
124+
masterQueue: "main",
125+
maxRunCount: 1,
126+
}))
127+
);
128+
}
129+
const [d1, d2] = dequeued;
124130
//attempts
125131
const attempt1 = await engine.startRunAttempt({
126132
runId: d1.run.id,

internal-packages/run-engine/src/engine/tests/dequeuing.test.ts

Lines changed: 96 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { expect } from "vitest";
66
import { MinimalAuthenticatedEnvironment } from "../../shared/index.js";
77
import { RunEngine } from "../index.js";
88
import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js";
9+
import { DequeuedMessage } from "@trigger.dev/core/v3";
910

1011
vi.setConfig({ testTimeout: 60_000 });
1112

@@ -63,106 +64,115 @@ describe("RunEngine dequeuing", () => {
6364
expect(queueLength).toBe(10);
6465

6566
//dequeue
66-
const dequeued = await engine.dequeueFromMasterQueue({
67-
consumerId: "test_12345",
68-
masterQueue: "main",
69-
maxRunCount: 5,
70-
});
67+
const dequeued: DequeuedMessage[] = [];
68+
for (let i = 0; i < 5; i++) {
69+
dequeued.push(
70+
...(await engine.dequeueFromMasterQueue({
71+
consumerId: "test_12345",
72+
masterQueue: "main",
73+
maxRunCount: 1,
74+
}))
75+
);
76+
}
7177

7278
expect(dequeued.length).toBe(5);
7379
} finally {
7480
engine.quit();
7581
}
7682
});
7783

78-
containerTest("Dequeues runs within machine constraints", async ({ prisma, redisOptions }) => {
79-
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
84+
//This will fail until we support dequeuing multiple runs from a single environment
85+
containerTest.fails(
86+
"Dequeues runs within machine constraints",
87+
async ({ prisma, redisOptions }) => {
88+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
8089

81-
const engine = new RunEngine({
82-
prisma,
83-
worker: {
84-
redis: redisOptions,
85-
workers: 1,
86-
tasksPerWorker: 10,
87-
pollIntervalMs: 100,
88-
},
89-
queue: {
90-
redis: redisOptions,
91-
},
92-
runLock: {
93-
redis: redisOptions,
94-
},
95-
machines: {
96-
defaultMachine: "small-1x",
97-
machines: {
98-
"small-1x": {
99-
name: "small-1x" as const,
100-
cpu: 0.5,
101-
memory: 0.5,
102-
centsPerMs: 0.0001,
103-
},
104-
},
105-
baseCostInCents: 0.0005,
106-
},
107-
tracer: trace.getTracer("test", "0.0.0"),
108-
});
109-
110-
try {
111-
const taskIdentifier = "test-task";
112-
113-
//create background worker
114-
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier, {
115-
preset: "small-1x",
116-
});
117-
118-
//trigger the runs
119-
const runs = await triggerRuns({
120-
engine,
121-
environment: authenticatedEnvironment,
122-
taskIdentifier,
90+
const engine = new RunEngine({
12391
prisma,
124-
count: 20,
125-
});
126-
expect(runs.length).toBe(20);
127-
128-
//check the queue length
129-
const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
130-
expect(queueLength).toBe(20);
131-
132-
//dequeue
133-
const dequeued = await engine.dequeueFromMasterQueue({
134-
consumerId: "test_12345",
135-
masterQueue: "main",
136-
maxRunCount: 5,
137-
maxResources: {
138-
cpu: 1.1,
139-
memory: 3.8,
92+
worker: {
93+
redis: redisOptions,
94+
workers: 1,
95+
tasksPerWorker: 10,
96+
pollIntervalMs: 100,
14097
},
141-
});
142-
expect(dequeued.length).toBe(2);
143-
144-
//check the queue length
145-
const queueLength2 = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
146-
expect(queueLength2).toBe(18);
147-
148-
const dequeued2 = await engine.dequeueFromMasterQueue({
149-
consumerId: "test_12345",
150-
masterQueue: "main",
151-
maxRunCount: 10,
152-
maxResources: {
153-
cpu: 4.7,
154-
memory: 3.0,
98+
queue: {
99+
redis: redisOptions,
100+
},
101+
runLock: {
102+
redis: redisOptions,
103+
},
104+
machines: {
105+
defaultMachine: "small-1x",
106+
machines: {
107+
"small-1x": {
108+
name: "small-1x" as const,
109+
cpu: 0.5,
110+
memory: 0.5,
111+
centsPerMs: 0.0001,
112+
},
113+
},
114+
baseCostInCents: 0.0005,
155115
},
116+
tracer: trace.getTracer("test", "0.0.0"),
156117
});
157-
expect(dequeued2.length).toBe(6);
158118

159-
//check the queue length
160-
const queueLength3 = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
161-
expect(queueLength3).toBe(12);
162-
} finally {
163-
engine.quit();
119+
try {
120+
const taskIdentifier = "test-task";
121+
122+
//create background worker
123+
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier, {
124+
preset: "small-1x",
125+
});
126+
127+
//trigger the runs
128+
const runs = await triggerRuns({
129+
engine,
130+
environment: authenticatedEnvironment,
131+
taskIdentifier,
132+
prisma,
133+
count: 20,
134+
});
135+
expect(runs.length).toBe(20);
136+
137+
//check the queue length
138+
const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
139+
expect(queueLength).toBe(20);
140+
141+
//dequeue
142+
const dequeued = await engine.dequeueFromMasterQueue({
143+
consumerId: "test_12345",
144+
masterQueue: "main",
145+
maxRunCount: 5,
146+
maxResources: {
147+
cpu: 1.1,
148+
memory: 3.8,
149+
},
150+
});
151+
expect(dequeued.length).toBe(2);
152+
153+
//check the queue length
154+
const queueLength2 = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
155+
expect(queueLength2).toBe(18);
156+
157+
const dequeued2 = await engine.dequeueFromMasterQueue({
158+
consumerId: "test_12345",
159+
masterQueue: "main",
160+
maxRunCount: 10,
161+
maxResources: {
162+
cpu: 4.7,
163+
memory: 3.0,
164+
},
165+
});
166+
expect(dequeued2.length).toBe(6);
167+
168+
//check the queue length
169+
const queueLength3 = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
170+
expect(queueLength3).toBe(12);
171+
} finally {
172+
engine.quit();
173+
}
164174
}
165-
});
175+
);
166176
});
167177

168178
async function triggerRuns({

internal-packages/run-engine/src/engine/tests/pendingVersion.test.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { trace } from "@internal/tracing";
33
import { RunEngine } from "../index.js";
44
import { setTimeout } from "timers/promises";
55
import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js";
6+
import { DequeuedMessage } from "@trigger.dev/core/v3";
67

78
vi.setConfig({ testTimeout: 60_000 });
89

@@ -100,11 +101,17 @@ describe("RunEngine pending version", () => {
100101
await setupBackgroundWorker(engine, authenticatedEnvironment, ["test-task-other"]);
101102

102103
//dequeuing should fail
103-
const dequeued = await engine.dequeueFromMasterQueue({
104-
consumerId: "test_12345",
105-
masterQueue: run.masterQueue,
106-
maxRunCount: 10,
107-
});
104+
105+
const dequeued: DequeuedMessage[] = [];
106+
for (let i = 0; i < 2; i++) {
107+
dequeued.push(
108+
...(await engine.dequeueFromMasterQueue({
109+
consumerId: "test_12345",
110+
masterQueue: "main",
111+
maxRunCount: 1,
112+
}))
113+
);
114+
}
108115
expect(dequeued.length).toBe(0);
109116

110117
//queue should be empty

internal-packages/run-engine/src/engine/tests/priority.test.ts

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { PrismaClientOrTransaction } from "@trigger.dev/database";
66
import { MinimalAuthenticatedEnvironment } from "../../shared/index.js";
77
import { setTimeout } from "timers/promises";
88
import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js";
9+
import { DequeuedMessage } from "@trigger.dev/core/v3";
910

1011
vi.setConfig({ testTimeout: 60_000 });
1112

@@ -76,12 +77,16 @@ describe("RunEngine priority", () => {
7677
const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
7778
expect(queueLength).toBe(priorities.length);
7879

79-
//dequeue (expect 4 items because of the negative priority)
80-
const dequeue = await engine.dequeueFromMasterQueue({
81-
consumerId: "test_12345",
82-
masterQueue: "main",
83-
maxRunCount: 20,
84-
});
80+
//dequeue 4 times, in order
81+
const dequeue: DequeuedMessage[] = [];
82+
for (let i = 0; i < 4; i++) {
83+
const items = await engine.dequeueFromMasterQueue({
84+
consumerId: "test_12345",
85+
masterQueue: "main",
86+
maxRunCount: 1,
87+
});
88+
dequeue.push(...items);
89+
}
8590
expect(dequeue.length).toBe(4);
8691
expect(dequeue[0].run.friendlyId).toBe(runs[4].friendlyId);
8792
expect(dequeue[1].run.friendlyId).toBe(runs[3].friendlyId);
@@ -175,11 +180,16 @@ describe("RunEngine priority", () => {
175180
expect(queueLength).toBe(queueTimestamps.length);
176181

177182
//dequeue (expect 4 items because of the negative priority)
178-
const dequeue = await engine.dequeueFromMasterQueue({
179-
consumerId: "test_12345",
180-
masterQueue: "main",
181-
maxRunCount: 20,
182-
});
183+
const dequeue: DequeuedMessage[] = [];
184+
for (let i = 0; i < 5; i++) {
185+
dequeue.push(
186+
...(await engine.dequeueFromMasterQueue({
187+
consumerId: "test_12345",
188+
masterQueue: "main",
189+
maxRunCount: 1,
190+
}))
191+
);
192+
}
183193
expect(dequeue.length).toBe(queueTimestamps.length);
184194
expect(dequeue[0].run.friendlyId).toBe(runs[2].friendlyId);
185195
expect(dequeue[1].run.friendlyId).toBe(runs[3].friendlyId);

internal-packages/run-engine/src/run-queue/index.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,8 @@ describe("RunQueue", () => {
355355
}
356356
);
357357

358-
redisTest(
358+
// This test fails now because we only return a single run per env. We will change this in the future.
359+
redisTest.fails(
359360
"Dequeue multiple messages from the queue",
360361
{ timeout: 5_000 },
361362
async ({ redisContainer }) => {

0 commit comments

Comments
 (0)