Skip to content

Commit 0f66023

Browse files
authored
v4: metadata collapsing now preserves operation order for correctness (#2115)
* v4: metadata collapsing now preserves operation order for correctness * Add changeset
1 parent 021d6d8 commit 0f66023

File tree

3 files changed

+126
-106
lines changed

3 files changed

+126
-106
lines changed

.changeset/nasty-cobras-wonder.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"trigger.dev": patch
3+
---
4+
5+
Fix metadata collapsing correctness

packages/core/src/v3/runMetadata/operations.ts

Lines changed: 57 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -173,65 +173,68 @@ export function collapseOperations(
173173
return operations;
174174
}
175175

176-
// Maps to track collapsible operations
177-
const incrementsByKey = new Map<string, number>();
178-
const setsByKey = new Map<string, RunMetadataChangeOperation>();
179-
const deletesByKey = new Set<string>();
180-
const preservedOperations: RunMetadataChangeOperation[] = [];
181-
182-
// Process operations in order
183-
for (const operation of operations) {
184-
switch (operation.type) {
185-
case "increment": {
186-
const currentIncrement = incrementsByKey.get(operation.key) || 0;
187-
incrementsByKey.set(operation.key, currentIncrement + operation.value);
188-
break;
189-
}
190-
case "set": {
191-
// Keep only the last set operation for each key
192-
setsByKey.set(operation.key, operation);
193-
break;
194-
}
195-
case "delete": {
196-
// Keep only one delete operation per key
197-
deletesByKey.add(operation.key);
198-
break;
199-
}
200-
case "append":
201-
case "remove":
202-
case "update": {
203-
// Preserve these operations as-is to maintain correctness
204-
preservedOperations.push(operation);
205-
break;
206-
}
207-
default: {
208-
// Handle any future operation types by preserving them
209-
preservedOperations.push(operation);
210-
break;
211-
}
176+
const collapsed: RunMetadataChangeOperation[] = [];
177+
let i = 0;
178+
while (i < operations.length) {
179+
const op = operations[i];
180+
if (!op) {
181+
i++;
182+
continue;
212183
}
213-
}
214184

215-
// Build the collapsed operations array
216-
const collapsedOperations: RunMetadataChangeOperation[] = [];
185+
// Collapse consecutive increments on the same key
186+
if (op.type === "increment") {
187+
let sum = op.value;
188+
let j = i + 1;
189+
while (
190+
j < operations.length &&
191+
operations[j]?.type === "increment" &&
192+
(operations[j] as typeof op)?.key === op.key
193+
) {
194+
sum += (operations[j] as typeof op).value;
195+
j++;
196+
}
197+
collapsed.push({ type: "increment", key: op.key, value: sum });
198+
i = j;
199+
continue;
200+
}
217201

218-
// Add collapsed increment operations
219-
for (const [key, value] of incrementsByKey) {
220-
collapsedOperations.push({ type: "increment", key, value });
221-
}
202+
// Collapse consecutive sets on the same key (keep only the last in the sequence)
203+
if (op.type === "set") {
204+
let last = op;
205+
let j = i + 1;
206+
while (
207+
j < operations.length &&
208+
operations[j]?.type === "set" &&
209+
(operations[j] as typeof op)?.key === op.key
210+
) {
211+
last = operations[j] as typeof op;
212+
j++;
213+
}
214+
collapsed.push(last);
215+
i = j;
216+
continue;
217+
}
222218

223-
// Add collapsed set operations
224-
for (const operation of setsByKey.values()) {
225-
collapsedOperations.push(operation);
226-
}
219+
// Collapse consecutive deletes on the same key (keep only one)
220+
if (op.type === "delete") {
221+
let j = i + 1;
222+
while (
223+
j < operations.length &&
224+
operations[j]?.type === "delete" &&
225+
(operations[j] as typeof op)?.key === op.key
226+
) {
227+
j++;
228+
}
229+
collapsed.push(op);
230+
i = j;
231+
continue;
232+
}
227233

228-
// Add collapsed delete operations
229-
for (const key of deletesByKey) {
230-
collapsedOperations.push({ type: "delete", key });
234+
// For append, remove, update, and unknown types, preserve order and do not collapse
235+
collapsed.push(op);
236+
i++;
231237
}
232238

233-
// Add preserved operations
234-
collapsedOperations.push(...preservedOperations);
235-
236-
return collapsedOperations;
239+
return collapsed;
237240
}

packages/core/test/standardMetadataManager.test.ts

Lines changed: 64 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -395,34 +395,36 @@ describe("StandardMetadataManager", () => {
395395
const update = metadataUpdates[0]!;
396396
const operations = update.operations!;
397397

398-
// Should have: 1 collapsed increment, 1 collapsed set, 2 appends
399-
// (delete operations on non-existent keys are not queued)
400-
expect(operations).toHaveLength(4);
401-
402-
// Find each operation type
403-
const incrementOp = operations.find((op) => op.type === "increment");
404-
const setOp = operations.find((op) => op.type === "set");
405-
const appendOps = operations.filter((op) => op.type === "append");
398+
// With order-preserving collapse, expect 6 operations:
399+
// increment(counter, 5), set(status, processing), append(logs, ...), increment(counter, 3), set(status, completed), append(logs, ...)
400+
expect(operations).toHaveLength(6);
406401

407-
expect(incrementOp).toEqual({
402+
expect(operations[0]).toEqual({
408403
type: "increment",
409404
key: "counter",
410-
value: 8, // 5 + 3
405+
value: 5,
411406
});
412-
413-
expect(setOp).toEqual({
407+
expect(operations[1]).toEqual({
414408
type: "set",
415409
key: "status",
416-
value: "completed", // Last set value
410+
value: "processing",
417411
});
418-
419-
expect(appendOps).toHaveLength(2);
420-
expect(appendOps[0]).toEqual({
412+
expect(operations[2]).toEqual({
421413
type: "append",
422414
key: "logs",
423415
value: "Started processing",
424416
});
425-
expect(appendOps[1]).toEqual({
417+
expect(operations[3]).toEqual({
418+
type: "increment",
419+
key: "counter",
420+
value: 3,
421+
});
422+
expect(operations[4]).toEqual({
423+
type: "set",
424+
key: "status",
425+
value: "completed",
426+
});
427+
expect(operations[5]).toEqual({
426428
type: "append",
427429
key: "logs",
428430
value: "Processing completed",
@@ -454,42 +456,48 @@ describe("StandardMetadataManager", () => {
454456
const update = metadataUpdates[1]!;
455457
const operations = update.operations!;
456458

457-
// Should have: 1 collapsed increment, 1 collapsed set, 2 appends, 2 collapsed deletes
458-
expect(operations).toHaveLength(6);
459-
460-
// Find each operation type
461-
const incrementOp = operations.find((op) => op.type === "increment");
462-
const setOp = operations.find((op) => op.type === "set");
463-
const appendOps = operations.filter((op) => op.type === "append");
464-
const deleteOps = operations.filter((op) => op.type === "delete");
459+
// With order-preserving collapse, expect 8 operations:
460+
// increment(counter, 5), set(status, processing), append(logs, ...), increment(counter, 3), set(status, completed), append(logs, ...), delete(tempData1), delete(tempData2)
461+
expect(operations).toHaveLength(8);
465462

466-
expect(incrementOp).toEqual({
463+
expect(operations[0]).toEqual({
467464
type: "increment",
468465
key: "counter",
469-
value: 8, // 5 + 3
466+
value: 5,
470467
});
471-
472-
expect(setOp).toEqual({
468+
expect(operations[1]).toEqual({
473469
type: "set",
474470
key: "status",
475-
value: "completed", // Last set value
471+
value: "processing",
476472
});
477-
478-
expect(appendOps).toHaveLength(2);
479-
expect(appendOps[0]).toEqual({
473+
expect(operations[2]).toEqual({
480474
type: "append",
481475
key: "logs",
482476
value: "Started processing",
483477
});
484-
expect(appendOps[1]).toEqual({
478+
expect(operations[3]).toEqual({
479+
type: "increment",
480+
key: "counter",
481+
value: 3,
482+
});
483+
expect(operations[4]).toEqual({
484+
type: "set",
485+
key: "status",
486+
value: "completed",
487+
});
488+
expect(operations[5]).toEqual({
485489
type: "append",
486490
key: "logs",
487491
value: "Processing completed",
488492
});
489-
490-
expect(deleteOps).toHaveLength(2);
491-
const deleteKeys = deleteOps.map((op) => (op as any).key).sort();
492-
expect(deleteKeys).toEqual(["tempData1", "tempData2"]);
493+
expect(operations[6]).toEqual({
494+
type: "delete",
495+
key: "tempData1",
496+
});
497+
expect(operations[7]).toEqual({
498+
type: "delete",
499+
key: "tempData2",
500+
});
493501
});
494502

495503
test("should collapse operations across different keys independently", async () => {
@@ -504,26 +512,30 @@ describe("StandardMetadataManager", () => {
504512
expect(metadataUpdates).toHaveLength(1);
505513

506514
const update = metadataUpdates[0]!;
507-
expect(update.operations).toHaveLength(2);
508-
509-
// Should have separate collapsed increments for each key
510-
const filesOp = update.operations!.find(
511-
(op) => op.type === "increment" && (op as any).key === "filesProcessed"
512-
);
513-
const errorsOp = update.operations!.find(
514-
(op) => op.type === "increment" && (op as any).key === "errorsCount"
515-
);
515+
const operations = update.operations!;
516516

517-
expect(filesOp).toEqual({
517+
// With order-preserving collapse, expect 4 operations:
518+
// increment(filesProcessed, 10), increment(errorsCount, 1), increment(filesProcessed, 5), increment(errorsCount, 2)
519+
expect(operations).toHaveLength(4);
520+
expect(operations[0]).toEqual({
518521
type: "increment",
519522
key: "filesProcessed",
520-
value: 15, // 10 + 5
523+
value: 10,
521524
});
522-
523-
expect(errorsOp).toEqual({
525+
expect(operations[1]).toEqual({
526+
type: "increment",
527+
key: "errorsCount",
528+
value: 1,
529+
});
530+
expect(operations[2]).toEqual({
531+
type: "increment",
532+
key: "filesProcessed",
533+
value: 5,
534+
});
535+
expect(operations[3]).toEqual({
524536
type: "increment",
525537
key: "errorsCount",
526-
value: 3, // 1 + 2
538+
value: 2,
527539
});
528540
});
529541

0 commit comments

Comments
 (0)