Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 63 additions & 10 deletions sdks/typescript/src/apache_beam/worker/operators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ export class CombinePerKeyPrecombineOperator<I, A, O>
keyCoder: Coder<unknown>;
windowCoder: Coder<Window>;

// Use a Map for LRU ordering: JavaScript Maps preserve insertion order,
// so we can implement LRU by deleting and re-inserting on access.
groups: Map<string, A>;
maxKeys: number = 10000;

Expand Down Expand Up @@ -468,24 +470,36 @@ export class CombinePerKeyPrecombineOperator<I, A, O>
);
}

/**
* Moves a key to the end of the Map (most recently used position).
* JavaScript Maps preserve insertion order, so delete + set moves to end.
*/
private touchKey(wkey: string, value: A) {
this.groups.delete(wkey);
this.groups.set(wkey, value);
}

process(wvalue: WindowedValue<any>) {
for (const window of wvalue.windows) {
const wkey =
encodeToBase64(window, this.windowCoder) +
" " +
encodeToBase64(wvalue.value.key, this.keyCoder);
if (!this.groups.has(wkey)) {
this.groups.set(wkey, this.combineFn.createAccumulator());
}
this.groups.set(
wkey,
this.combineFn.addInput(this.groups.get(wkey), wvalue.value.value),
);
const existingAccumulator = this.groups.get(wkey);
const newAccumulator =
existingAccumulator !== undefined
? this.combineFn.addInput(existingAccumulator, wvalue.value.value)
: this.combineFn.addInput(
this.combineFn.createAccumulator(),
wvalue.value.value,
);
// Move to end (most recently used) by delete + set
this.touchKey(wkey, newAccumulator);
}
if (this.groups.size > this.maxKeys) {
// Flush a random 10% of the map to make more room.
// TODO: Tune this, or better use LRU or ARC for this cache.
return this.flush(this.maxKeys * 0.9);
// Flush the least recently used entries (at the front of the Map)
// until we're back under maxKeys.
return this.flushLRU(this.maxKeys);
} else {
return NonPromise;
}
Expand All @@ -495,6 +509,45 @@ export class CombinePerKeyPrecombineOperator<I, A, O>
this.groups = new Map();
}

/**
* Flushes entries from the cache using LRU eviction.
* Evicts the least recently used entries (from the front of the Map)
* until the cache size is at or below the target.
*/
flushLRU(target: number): ProcessResult {
const result = new ProcessResultBuilder();
const toDelete: string[] = [];
// Iterate from the front (oldest/least recently used entries)
for (const [wkey, values] of this.groups) {
if (this.groups.size - toDelete.length <= target) {
break;
}
const parts = wkey.split(" ");
const encodedWindow = parts[0];
const encodedKey = parts[1];
const window = decodeFromBase64(encodedWindow, this.windowCoder);
result.add(
this.receiver.receive({
value: {
key: decodeFromBase64(encodedKey, this.keyCoder),
value: values,
},
windows: [window],
timestamp: window.maxTimestamp(),
pane: PaneInfoCoder.ONE_AND_ONLY_FIRING,
}),
);
toDelete.push(wkey);
}
for (const wkey of toDelete) {
this.groups.delete(wkey);
}
return result.build();
}

/**
* Flushes all entries from the cache.
*/
flush(target: number): ProcessResult {
const result = new ProcessResultBuilder();
const toDelete: string[] = [];
Expand Down