Skip to content

Commit 65b91a9

Browse files
committed
Reduce contention on batchTaskRun when setting expected count
1 parent 53e6d47 commit 65b91a9

File tree

2 files changed

+105
-64
lines changed

2 files changed

+105
-64
lines changed

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Lines changed: 88 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -760,10 +760,21 @@ export class BatchTriggerV3Service extends BaseService {
760760
}));
761761

762762
let workingIndex = currentIndex;
763+
let expectedCount = 0;
763764

764765
for (const item of itemsToProcess) {
765766
try {
766-
await this.#processBatchTaskRunItem(batch, environment, item, workingIndex, options);
767+
const created = await this.#processBatchTaskRunItem(
768+
batch,
769+
environment,
770+
item,
771+
workingIndex,
772+
options
773+
);
774+
775+
if (created) {
776+
expectedCount++;
777+
}
767778

768779
workingIndex++;
769780
} catch (error) {
@@ -780,6 +791,17 @@ export class BatchTriggerV3Service extends BaseService {
780791
}
781792
}
782793

794+
if (expectedCount > 0) {
795+
await this._prisma.batchTaskRun.update({
796+
where: { id: batch.id },
797+
data: {
798+
expectedCount: {
799+
increment: expectedCount,
800+
},
801+
},
802+
});
803+
}
804+
783805
return { workingIndex };
784806
}
785807

@@ -825,21 +847,15 @@ export class BatchTriggerV3Service extends BaseService {
825847

826848
if (!result.isCached) {
827849
try {
828-
await $transaction(this._prisma, async (tx) => {
829-
// [batchTaskRunId, taskRunId] is a unique index
830-
await tx.batchTaskRunItem.create({
831-
data: {
832-
batchTaskRunId: batch.id,
833-
taskRunId: result.run.id,
834-
status: batchTaskRunItemStatusForRunStatus(result.run.status),
835-
},
836-
});
837-
838-
await tx.batchTaskRun.update({
839-
where: { id: batch.id },
840-
data: { expectedCount: { increment: 1 } },
841-
});
850+
await this._prisma.batchTaskRunItem.create({
851+
data: {
852+
batchTaskRunId: batch.id,
853+
taskRunId: result.run.id,
854+
status: batchTaskRunItemStatusForRunStatus(result.run.status),
855+
},
842856
});
857+
858+
return true;
843859
} catch (error) {
844860
if (isUniqueConstraintError(error, ["batchTaskRunId", "taskRunId"])) {
845861
// This means there is already a batchTaskRunItem for this batch and taskRun
@@ -852,12 +868,14 @@ export class BatchTriggerV3Service extends BaseService {
852868
}
853869
);
854870

855-
return;
871+
return false;
856872
}
857873

858874
throw error;
859875
}
860876
}
877+
878+
return false;
861879
}
862880

863881
async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) {
@@ -907,62 +925,69 @@ export async function completeBatchTaskRunItemV3(
907925
scheduleResumeOnComplete = false,
908926
taskRunAttemptId?: string
909927
) {
910-
await $transaction(tx, "completeBatchTaskRunItemV3", async (tx, span) => {
911-
span?.setAttribute("batch_id", batchTaskRunId);
912-
913-
// Update the item to complete
914-
const updated = await tx.batchTaskRunItem.updateMany({
915-
where: {
916-
id: itemId,
917-
status: "PENDING",
918-
},
919-
data: {
920-
status: "COMPLETED",
921-
taskRunAttemptId,
922-
},
923-
});
924-
925-
if (updated.count === 0) {
926-
return;
927-
}
928-
929-
const updatedBatchRun = await tx.batchTaskRun.update({
930-
where: {
931-
id: batchTaskRunId,
932-
},
933-
data: {
934-
completedCount: {
935-
increment: 1,
928+
await $transaction(
929+
tx,
930+
"completeBatchTaskRunItemV3",
931+
async (tx, span) => {
932+
span?.setAttribute("batch_id", batchTaskRunId);
933+
934+
// Update the item to complete
935+
const updated = await tx.batchTaskRunItem.updateMany({
936+
where: {
937+
id: itemId,
938+
status: "PENDING",
936939
},
937-
},
938-
select: {
939-
sealed: true,
940-
status: true,
941-
completedCount: true,
942-
expectedCount: true,
943-
dependentTaskAttemptId: true,
944-
},
945-
});
940+
data: {
941+
status: "COMPLETED",
942+
taskRunAttemptId,
943+
},
944+
});
945+
946+
if (updated.count === 0) {
947+
return;
948+
}
946949

947-
if (
948-
updatedBatchRun.status === "PENDING" &&
949-
updatedBatchRun.completedCount === updatedBatchRun.expectedCount &&
950-
updatedBatchRun.sealed
951-
) {
952-
await tx.batchTaskRun.update({
950+
const updatedBatchRun = await tx.batchTaskRun.update({
953951
where: {
954952
id: batchTaskRunId,
955953
},
956954
data: {
957-
status: "COMPLETED",
958-
completedAt: new Date(),
955+
completedCount: {
956+
increment: 1,
957+
},
958+
},
959+
select: {
960+
sealed: true,
961+
status: true,
962+
completedCount: true,
963+
expectedCount: true,
964+
dependentTaskAttemptId: true,
959965
},
960966
});
961967

962-
// We only need to resume the batch if it has a dependent task attempt ID
963-
if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) {
964-
await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx);
968+
if (
969+
updatedBatchRun.status === "PENDING" &&
970+
updatedBatchRun.completedCount === updatedBatchRun.expectedCount &&
971+
updatedBatchRun.sealed
972+
) {
973+
await tx.batchTaskRun.update({
974+
where: {
975+
id: batchTaskRunId,
976+
},
977+
data: {
978+
status: "COMPLETED",
979+
completedAt: new Date(),
980+
},
981+
});
982+
983+
// We only need to resume the batch if it has a dependent task attempt ID
984+
if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) {
985+
await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx);
986+
}
965987
}
988+
},
989+
{
990+
timeout: 10000,
966991
}
967-
});
992+
);
968993
}

internal-packages/database/src/transaction.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,21 @@ export type PrismaTransactionOptions = {
3030
isolationLevel?: Prisma.TransactionIsolationLevel;
3131

3232
swallowPrismaErrors?: boolean;
33+
34+
/**
35+
* The maximum number of times the transaction will be retried in case of a serialization failure. The default value is 0.
36+
*
37+
* See https://www.prisma.io/docs/orm/prisma-client/queries/transactions#transaction-timing-issues
38+
*/
39+
maxRetries?: number;
3340
};
3441

3542
export async function $transaction<R>(
3643
prisma: PrismaClientOrTransaction,
3744
fn: (prisma: PrismaTransactionClient) => Promise<R>,
3845
prismaError: (error: Prisma.PrismaClientKnownRequestError) => void,
39-
options?: PrismaTransactionOptions
46+
options?: PrismaTransactionOptions,
47+
attempt = 0
4048
): Promise<R | undefined> {
4149
if (isTransactionClient(prisma)) {
4250
return fn(prisma);
@@ -46,6 +54,14 @@ export async function $transaction<R>(
4654
return await (prisma as PrismaClient).$transaction(fn, options);
4755
} catch (error) {
4856
if (isPrismaKnownError(error)) {
57+
if (
58+
error.code === "P2034" &&
59+
typeof options?.maxRetries === "number" &&
60+
attempt < options.maxRetries
61+
) {
62+
return $transaction(prisma, fn, prismaError, options, attempt + 1);
63+
}
64+
4965
prismaError(error);
5066

5167
if (options?.swallowPrismaErrors) {

0 commit comments

Comments
 (0)