Skip to content

Commit 2957ee9

Browse files
authored
Remove batch ID carryover for non-batch waits (#1930)
* add failing test case * do not carry over previous batch id when blocking with waitpoint * delete irrelevant test
1 parent bcef969 commit 2957ee9

File tree

2 files changed

+228
-1
lines changed

2 files changed

+228
-1
lines changed

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,8 @@ export class WaitpointSystem {
422422
environmentType: snapshot.environmentType,
423423
projectId: snapshot.projectId,
424424
organizationId: snapshot.organizationId,
425-
batchId: batch?.id ?? snapshot.batchId ?? undefined,
425+
// Do NOT carry over the batchId from the previous snapshot
426+
batchId: batch?.id,
426427
workerId,
427428
runnerId,
428429
});

internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,4 +362,230 @@ describe("RunEngine batchTriggerAndWait", () => {
362362
engine.quit();
363363
}
364364
});
365+
366+
containerTest(
367+
"batch ID should not carry over to triggerAndWait",
368+
async ({ prisma, redisOptions }) => {
369+
//create environment
370+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
371+
372+
const engine = new RunEngine({
373+
prisma,
374+
worker: {
375+
redis: redisOptions,
376+
workers: 1,
377+
tasksPerWorker: 10,
378+
pollIntervalMs: 20,
379+
},
380+
queue: {
381+
redis: redisOptions,
382+
},
383+
runLock: {
384+
redis: redisOptions,
385+
},
386+
machines: {
387+
defaultMachine: "small-1x",
388+
machines: {
389+
"small-1x": {
390+
name: "small-1x" as const,
391+
cpu: 0.5,
392+
memory: 0.5,
393+
centsPerMs: 0.0001,
394+
},
395+
},
396+
baseCostInCents: 0.0001,
397+
},
398+
tracer: trace.getTracer("test", "0.0.0"),
399+
});
400+
401+
try {
402+
const parentTask = "parent-task";
403+
const batchChildTask = "batch-child-task";
404+
const triggerAndWaitChildTask = "trigger-and-wait-child-task";
405+
406+
//create background worker
407+
await setupBackgroundWorker(engine, authenticatedEnvironment, [
408+
parentTask,
409+
batchChildTask,
410+
triggerAndWaitChildTask,
411+
]);
412+
413+
//create a batch
414+
const batch = await prisma.batchTaskRun.create({
415+
data: {
416+
friendlyId: generateFriendlyId("batch"),
417+
runtimeEnvironmentId: authenticatedEnvironment.id,
418+
},
419+
});
420+
421+
//trigger the parent run
422+
const parentRun = await engine.trigger(
423+
{
424+
number: 1,
425+
friendlyId: "run_p1234",
426+
environment: authenticatedEnvironment,
427+
taskIdentifier: parentTask,
428+
payload: "{}",
429+
payloadType: "application/json",
430+
context: {},
431+
traceContext: {},
432+
traceId: "t12345",
433+
spanId: "s12345",
434+
masterQueue: "main",
435+
queue: `task/${parentTask}`,
436+
isTest: false,
437+
tags: [],
438+
},
439+
prisma
440+
);
441+
442+
//dequeue parent
443+
const dequeued = await engine.dequeueFromMasterQueue({
444+
consumerId: "test_12345",
445+
masterQueue: parentRun.masterQueue,
446+
maxRunCount: 10,
447+
});
448+
449+
//create an attempt
450+
const initialExecutionData = await engine.getRunExecutionData({ runId: parentRun.id });
451+
assertNonNullable(initialExecutionData);
452+
const attemptResult = await engine.startRunAttempt({
453+
runId: parentRun.id,
454+
snapshotId: initialExecutionData.snapshot.id,
455+
});
456+
457+
//block using the batch
458+
await engine.blockRunWithCreatedBatch({
459+
runId: parentRun.id,
460+
batchId: batch.id,
461+
environmentId: authenticatedEnvironment.id,
462+
projectId: authenticatedEnvironment.projectId,
463+
organizationId: authenticatedEnvironment.organizationId,
464+
});
465+
466+
const afterBlockedByBatch = await engine.getRunExecutionData({ runId: parentRun.id });
467+
assertNonNullable(afterBlockedByBatch);
468+
expect(afterBlockedByBatch.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");
469+
expect(afterBlockedByBatch.batch?.id).toBe(batch.id);
470+
471+
//create a batch child
472+
const batchChild = await engine.trigger(
473+
{
474+
number: 1,
475+
friendlyId: "run_c1234",
476+
environment: authenticatedEnvironment,
477+
taskIdentifier: batchChildTask,
478+
payload: "{}",
479+
payloadType: "application/json",
480+
context: {},
481+
traceContext: {},
482+
traceId: "t12345",
483+
spanId: "s12345",
484+
masterQueue: "main",
485+
queue: `task/${batchChildTask}`,
486+
isTest: false,
487+
tags: [],
488+
resumeParentOnCompletion: true,
489+
parentTaskRunId: parentRun.id,
490+
batch: { id: batch.id, index: 0 },
491+
},
492+
prisma
493+
);
494+
495+
const parentAfterBatchChild = await engine.getRunExecutionData({ runId: parentRun.id });
496+
assertNonNullable(parentAfterBatchChild);
497+
expect(parentAfterBatchChild.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");
498+
expect(parentAfterBatchChild.batch?.id).toBe(batch.id);
499+
500+
await engine.unblockRunForCreatedBatch({
501+
runId: parentRun.id,
502+
batchId: batch.id,
503+
environmentId: authenticatedEnvironment.id,
504+
projectId: authenticatedEnvironment.projectId,
505+
});
506+
507+
//dequeue and start the batch child
508+
const dequeuedBatchChild = await engine.dequeueFromMasterQueue({
509+
consumerId: "test_12345",
510+
masterQueue: batchChild.masterQueue,
511+
maxRunCount: 1,
512+
});
513+
514+
expect(dequeuedBatchChild.length).toBe(1);
515+
516+
const batchChildAttempt = await engine.startRunAttempt({
517+
runId: batchChild.id,
518+
snapshotId: dequeuedBatchChild[0].snapshot.id,
519+
});
520+
521+
//complete the batch child
522+
await engine.completeRunAttempt({
523+
runId: batchChildAttempt.run.id,
524+
snapshotId: batchChildAttempt.snapshot.id,
525+
completion: {
526+
id: batchChild.id,
527+
ok: true,
528+
output: '{"foo":"bar"}',
529+
outputType: "application/json",
530+
},
531+
});
532+
533+
await setTimeout(500);
534+
535+
const runWaitpointsAfterBatchChild = await prisma.taskRunWaitpoint.findMany({
536+
where: {
537+
taskRunId: parentRun.id,
538+
},
539+
include: {
540+
waitpoint: true,
541+
},
542+
});
543+
expect(runWaitpointsAfterBatchChild.length).toBe(0);
544+
545+
//parent snapshot
546+
const parentExecutionDataAfterBatchChildComplete = await engine.getRunExecutionData({
547+
runId: parentRun.id,
548+
});
549+
assertNonNullable(parentExecutionDataAfterBatchChildComplete);
550+
expect(parentExecutionDataAfterBatchChildComplete.snapshot.executionStatus).toBe(
551+
"EXECUTING"
552+
);
553+
expect(parentExecutionDataAfterBatchChildComplete.batch?.id).toBe(batch.id);
554+
expect(parentExecutionDataAfterBatchChildComplete.completedWaitpoints.length).toBe(2);
555+
556+
//now triggerAndWait
557+
const triggerAndWaitChildRun = await engine.trigger(
558+
{
559+
number: 1,
560+
friendlyId: "run_c123456",
561+
environment: authenticatedEnvironment,
562+
taskIdentifier: triggerAndWaitChildTask,
563+
payload: "{}",
564+
payloadType: "application/json",
565+
context: {},
566+
traceContext: {},
567+
traceId: "t123456",
568+
spanId: "s123456",
569+
masterQueue: "main",
570+
queue: `task/${triggerAndWaitChildTask}`,
571+
isTest: false,
572+
tags: [],
573+
resumeParentOnCompletion: true,
574+
parentTaskRunId: parentRun.id,
575+
},
576+
prisma
577+
);
578+
579+
//check that the parent's execution data doesn't have a batch ID
580+
const parentAfterTriggerAndWait = await engine.getRunExecutionData({ runId: parentRun.id });
581+
assertNonNullable(parentAfterTriggerAndWait);
582+
expect(parentAfterTriggerAndWait.snapshot.executionStatus).toBe(
583+
"EXECUTING_WITH_WAITPOINTS"
584+
);
585+
expect(parentAfterTriggerAndWait.batch).toBeUndefined();
586+
} finally {
587+
engine.quit();
588+
}
589+
}
590+
);
365591
});

0 commit comments

Comments
 (0)