Skip to content

Remove batch ID carryover for non-batch waits #1930

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,8 @@ export class WaitpointSystem {
environmentType: snapshot.environmentType,
projectId: snapshot.projectId,
organizationId: snapshot.organizationId,
batchId: batch?.id ?? snapshot.batchId ?? undefined,
// Do NOT carry over the batchId from the previous snapshot
batchId: batch?.id,
workerId,
runnerId,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,4 +362,230 @@ describe("RunEngine batchTriggerAndWait", () => {
engine.quit();
}
});

containerTest(
"batch ID should not carry over to triggerAndWait",
async ({ prisma, redisOptions }) => {
//create environment
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");

const engine = new RunEngine({
prisma,
worker: {
redis: redisOptions,
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 20,
},
queue: {
redis: redisOptions,
},
runLock: {
redis: redisOptions,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0001,
},
tracer: trace.getTracer("test", "0.0.0"),
});

try {
const parentTask = "parent-task";
const batchChildTask = "batch-child-task";
const triggerAndWaitChildTask = "trigger-and-wait-child-task";

//create background worker
await setupBackgroundWorker(engine, authenticatedEnvironment, [
parentTask,
batchChildTask,
triggerAndWaitChildTask,
]);

//create a batch
const batch = await prisma.batchTaskRun.create({
data: {
friendlyId: generateFriendlyId("batch"),
runtimeEnvironmentId: authenticatedEnvironment.id,
},
});

//trigger the parent run
const parentRun = await engine.trigger(
{
number: 1,
friendlyId: "run_p1234",
environment: authenticatedEnvironment,
taskIdentifier: parentTask,
payload: "{}",
payloadType: "application/json",
context: {},
traceContext: {},
traceId: "t12345",
spanId: "s12345",
masterQueue: "main",
queue: `task/${parentTask}`,
isTest: false,
tags: [],
},
prisma
);

//dequeue parent
const dequeued = await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: parentRun.masterQueue,
maxRunCount: 10,
});

//create an attempt
const initialExecutionData = await engine.getRunExecutionData({ runId: parentRun.id });
assertNonNullable(initialExecutionData);
const attemptResult = await engine.startRunAttempt({
runId: parentRun.id,
snapshotId: initialExecutionData.snapshot.id,
});

//block using the batch
await engine.blockRunWithCreatedBatch({
runId: parentRun.id,
batchId: batch.id,
environmentId: authenticatedEnvironment.id,
projectId: authenticatedEnvironment.projectId,
organizationId: authenticatedEnvironment.organizationId,
});

const afterBlockedByBatch = await engine.getRunExecutionData({ runId: parentRun.id });
assertNonNullable(afterBlockedByBatch);
expect(afterBlockedByBatch.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");
expect(afterBlockedByBatch.batch?.id).toBe(batch.id);

//create a batch child
const batchChild = await engine.trigger(
{
number: 1,
friendlyId: "run_c1234",
environment: authenticatedEnvironment,
taskIdentifier: batchChildTask,
payload: "{}",
payloadType: "application/json",
context: {},
traceContext: {},
traceId: "t12345",
spanId: "s12345",
masterQueue: "main",
queue: `task/${batchChildTask}`,
isTest: false,
tags: [],
resumeParentOnCompletion: true,
parentTaskRunId: parentRun.id,
batch: { id: batch.id, index: 0 },
},
prisma
);

const parentAfterBatchChild = await engine.getRunExecutionData({ runId: parentRun.id });
assertNonNullable(parentAfterBatchChild);
expect(parentAfterBatchChild.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");
expect(parentAfterBatchChild.batch?.id).toBe(batch.id);

await engine.unblockRunForCreatedBatch({
runId: parentRun.id,
batchId: batch.id,
environmentId: authenticatedEnvironment.id,
projectId: authenticatedEnvironment.projectId,
});

//dequeue and start the batch child
const dequeuedBatchChild = await engine.dequeueFromMasterQueue({
consumerId: "test_12345",
masterQueue: batchChild.masterQueue,
maxRunCount: 1,
});

expect(dequeuedBatchChild.length).toBe(1);

const batchChildAttempt = await engine.startRunAttempt({
runId: batchChild.id,
snapshotId: dequeuedBatchChild[0].snapshot.id,
});

//complete the batch child
await engine.completeRunAttempt({
runId: batchChildAttempt.run.id,
snapshotId: batchChildAttempt.snapshot.id,
completion: {
id: batchChild.id,
ok: true,
output: '{"foo":"bar"}',
outputType: "application/json",
},
});

await setTimeout(500);

const runWaitpointsAfterBatchChild = await prisma.taskRunWaitpoint.findMany({
where: {
taskRunId: parentRun.id,
},
include: {
waitpoint: true,
},
});
expect(runWaitpointsAfterBatchChild.length).toBe(0);

//parent snapshot
const parentExecutionDataAfterBatchChildComplete = await engine.getRunExecutionData({
runId: parentRun.id,
});
assertNonNullable(parentExecutionDataAfterBatchChildComplete);
expect(parentExecutionDataAfterBatchChildComplete.snapshot.executionStatus).toBe(
"EXECUTING"
);
expect(parentExecutionDataAfterBatchChildComplete.batch?.id).toBe(batch.id);
expect(parentExecutionDataAfterBatchChildComplete.completedWaitpoints.length).toBe(2);

//now triggerAndWait
const triggerAndWaitChildRun = await engine.trigger(
{
number: 1,
friendlyId: "run_c123456",
environment: authenticatedEnvironment,
taskIdentifier: triggerAndWaitChildTask,
payload: "{}",
payloadType: "application/json",
context: {},
traceContext: {},
traceId: "t123456",
spanId: "s123456",
masterQueue: "main",
queue: `task/${triggerAndWaitChildTask}`,
isTest: false,
tags: [],
resumeParentOnCompletion: true,
parentTaskRunId: parentRun.id,
},
prisma
);

//check that the parent's execution data doesn't have a batch ID
const parentAfterTriggerAndWait = await engine.getRunExecutionData({ runId: parentRun.id });
assertNonNullable(parentAfterTriggerAndWait);
expect(parentAfterTriggerAndWait.snapshot.executionStatus).toBe(
"EXECUTING_WITH_WAITPOINTS"
);
expect(parentAfterTriggerAndWait.batch).toBeUndefined();
} finally {
engine.quit();
}
}
);
});