Skip to content

Commit 4312701

Browse files
committed
wip
1 parent 2a2825f commit 4312701

File tree

2 files changed

+62
-28
lines changed

2 files changed

+62
-28
lines changed

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
112112
} catch (error) {
113113
// Detect a prisma transaction Unique constraint violation
114114
if (error instanceof Prisma.PrismaClientKnownRequestError) {
115-
logger.debug("BatchTriggerV3: Prisma transaction error", {
115+
logger.debug("RunEngineBatchTrigger: Prisma transaction error", {
116116
code: error.code,
117117
message: error.message,
118118
meta: error.meta,
@@ -188,15 +188,15 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
188188

189189
switch (result.status) {
190190
case "COMPLETE": {
191-
logger.debug("[BatchTriggerV3][call] Batch inline processing complete", {
191+
logger.debug("[RunEngineBatchTrigger][call] Batch inline processing complete", {
192192
batchId: batch.friendlyId,
193193
currentIndex: 0,
194194
});
195195

196196
return batch;
197197
}
198198
case "INCOMPLETE": {
199-
logger.debug("[BatchTriggerV3][call] Batch inline processing incomplete", {
199+
logger.debug("[RunEngineBatchTrigger][call] Batch inline processing incomplete", {
200200
batchId: batch.friendlyId,
201201
currentIndex: result.workingIndex,
202202
});
@@ -218,7 +218,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
218218
return batch;
219219
}
220220
case "ERROR": {
221-
logger.error("[BatchTriggerV3][call] Batch inline processing error", {
221+
logger.error("[RunEngineBatchTrigger][call] Batch inline processing error", {
222222
batchId: batch.friendlyId,
223223
currentIndex: result.workingIndex,
224224
error: result.error,
@@ -317,15 +317,15 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
317317
}
318318

319319
async processBatchTaskRun(options: BatchProcessingOptions) {
320-
logger.debug("[BatchTriggerV3][processBatchTaskRun] Processing batch", {
320+
logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] Processing batch", {
321321
options,
322322
});
323323

324324
const $attemptCount = options.attemptCount + 1;
325325

326326
// Add early return if max attempts reached
327327
if ($attemptCount > MAX_ATTEMPTS) {
328-
logger.error("[BatchTriggerV3][processBatchTaskRun] Max attempts reached", {
328+
logger.error("[RunEngineBatchTrigger][processBatchTaskRun] Max attempts reached", {
329329
options,
330330
attemptCount: $attemptCount,
331331
});
@@ -351,12 +351,15 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
351351

352352
// Check to make sure the currentIndex is not greater than the runCount
353353
if (options.range.start >= batch.runCount) {
354-
logger.debug("[BatchTriggerV3][processBatchTaskRun] currentIndex is greater than runCount", {
355-
options,
356-
batchId: batch.friendlyId,
357-
runCount: batch.runCount,
358-
attemptCount: $attemptCount,
359-
});
354+
logger.debug(
355+
"[RunEngineBatchTrigger][processBatchTaskRun] currentIndex is greater than runCount",
356+
{
357+
options,
358+
batchId: batch.friendlyId,
359+
runCount: batch.runCount,
360+
attemptCount: $attemptCount,
361+
}
362+
);
360363

361364
return;
362365
}
@@ -373,7 +376,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
373376
const payload = await parsePacket(payloadPacket);
374377

375378
if (!payload) {
376-
logger.debug("[BatchTriggerV3][processBatchTaskRun] Failed to parse payload", {
379+
logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] Failed to parse payload", {
377380
options,
378381
batchId: batch.friendlyId,
379382
attemptCount: $attemptCount,
@@ -399,7 +402,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
399402

400403
switch (result.status) {
401404
case "COMPLETE": {
402-
logger.debug("[BatchTriggerV3][processBatchTaskRun] Batch processing complete", {
405+
logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] Batch processing complete", {
403406
options,
404407
batchId: batch.friendlyId,
405408
attemptCount: $attemptCount,
@@ -408,7 +411,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
408411
return;
409412
}
410413
case "INCOMPLETE": {
411-
logger.debug("[BatchTriggerV3][processBatchTaskRun] Batch processing incomplete", {
414+
logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] Batch processing incomplete", {
412415
batchId: batch.friendlyId,
413416
currentIndex: result.workingIndex,
414417
attemptCount: $attemptCount,
@@ -434,7 +437,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
434437
return;
435438
}
436439
case "ERROR": {
437-
logger.error("[BatchTriggerV3][processBatchTaskRun] Batch processing error", {
440+
logger.error("[RunEngineBatchTrigger][processBatchTaskRun] Batch processing error", {
438441
batchId: batch.friendlyId,
439442
currentIndex: result.workingIndex,
440443
error: result.error,
@@ -505,7 +508,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
505508
// Grab the next PROCESSING_BATCH_SIZE items
506509
const itemsToProcess = items.slice(currentIndex, currentIndex + batchSize);
507510

508-
logger.debug("[BatchTriggerV3][processBatchTaskRun] Processing batch items", {
511+
logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] Processing batch items", {
509512
batchId: batch.friendlyId,
510513
currentIndex,
511514
runCount: batch.runCount,
@@ -528,19 +531,19 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
528531
});
529532

530533
if (!run) {
531-
logger.error("[BatchTriggerV3][processBatchTaskRun] Failed to process item", {
534+
logger.error("[RunEngineBatchTrigger][processBatchTaskRun] Failed to process item", {
532535
batchId: batch.friendlyId,
533536
currentIndex: workingIndex,
534537
});
535538

536-
throw new Error("[BatchTriggerV3][processBatchTaskRun] Failed to process item");
539+
throw new Error("[RunEngineBatchTrigger][processBatchTaskRun] Failed to process item");
537540
}
538541

539542
runIds.push(run.friendlyId);
540543

541544
workingIndex++;
542545
} catch (error) {
543-
logger.error("[BatchTriggerV3][processBatchTaskRun] Failed to process item", {
546+
logger.error("[RunEngineBatchTrigger][processBatchTaskRun] Failed to process item", {
544547
batchId: batch.friendlyId,
545548
currentIndex: workingIndex,
546549
error,
@@ -595,7 +598,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
595598
parentRunId: string | undefined;
596599
resumeParentOnCompletion: boolean | undefined;
597600
}) {
598-
logger.debug("[BatchTriggerV3][processBatchTaskRunItem] Processing item", {
601+
logger.debug("[RunEngineBatchTrigger][processBatchTaskRunItem] Processing item", {
599602
batchId: batch.friendlyId,
600603
currentIndex,
601604
});
@@ -634,7 +637,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
634637
async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) {
635638
await workerQueue.enqueue("runengine.processBatchTaskRun", options, {
636639
tx,
637-
jobKey: `BatchTriggerV3Service.process:${options.batchId}:${options.processingId}`,
640+
jobKey: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`,
638641
});
639642
}
640643

references/hello-world/src/trigger/deadlocks.ts

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,53 @@ const deadlockQueue = queue({
66
releaseConcurrencyOnWaitpoint: false,
77
});
88

9+
export const deadlockReleasingQueue = queue({
10+
name: "deadlock-releasing-queue",
11+
releaseConcurrencyOnWaitpoint: true,
12+
});
13+
914
export const deadlockTester = task({
1015
id: "deadlock-tester",
1116
run: async (payload: any, { ctx }) => {
12-
return await deadlockNestedTask.triggerAndWait({
13-
message: "Hello, world!",
14-
});
17+
// await deadlockNestedTask.triggerAndWait({
18+
// message: "Hello, world!",
19+
// });
20+
21+
await deadlockNestedTask.batchTriggerAndWait([
22+
{
23+
payload: {
24+
message: "Hello, world!",
25+
},
26+
},
27+
{
28+
payload: {
29+
message: "Hello, world!",
30+
},
31+
},
32+
]);
1533
},
1634
});
1735

1836
export const deadlockNestedTask = task({
1937
id: "deadlock-nested-task",
2038
queue: deadlockQueue,
2139
run: async (payload: any, { ctx }) => {
22-
await deadlockTester.triggerAndWait({
23-
message: "Hello, world!",
24-
});
40+
// await deadlockTester.triggerAndWait({
41+
// message: "Hello, world!",
42+
// });
43+
44+
await deadlockTester.batchTriggerAndWait([
45+
{
46+
payload: {
47+
message: "Hello, world!",
48+
},
49+
},
50+
{
51+
payload: {
52+
message: "Hello, world!",
53+
},
54+
},
55+
]);
2556

2657
return {
2758
message: "Hello, world!",

0 commit comments

Comments
 (0)