Skip to content

Commit 02d92cd

Browse files
authored
[Task Manager] Handles case where buffer receives multiple entities with the same ID (#74943) (#75150)
Handles the case where two operations for the same entity make it into a single batched bulk operation and avoid the clashing ID issue that could cause the poller to hang and stop poling for work).
1 parent 17cf06e commit 02d92cd

File tree

4 files changed

+125
-11
lines changed

4 files changed

+125
-11
lines changed

x-pack/plugins/task_manager/server/buffered_task_store.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,39 @@ describe('Buffered Task Store', () => {
5858
);
5959
expect(await results[2]).toMatchObject(tasks[2]);
6060
});
61+
62+
test('handles multiple items with the same id', async () => {
63+
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
64+
const bufferedStore = new BufferedTaskStore(taskStore, {});
65+
66+
const duplicateIdTask = mockTask();
67+
const tasks = [
68+
duplicateIdTask,
69+
mockTask(),
70+
mockTask(),
71+
{ ...mockTask(), id: duplicateIdTask.id },
72+
];
73+
74+
taskStore.bulkUpdate.mockResolvedValueOnce([
75+
asOk(tasks[0]),
76+
asErr({ entity: tasks[1], error: new Error('Oh no, something went terribly wrong') }),
77+
asOk(tasks[2]),
78+
asOk(tasks[3]),
79+
]);
80+
81+
const results = [
82+
bufferedStore.update(tasks[0]),
83+
bufferedStore.update(tasks[1]),
84+
bufferedStore.update(tasks[2]),
85+
bufferedStore.update(tasks[3]),
86+
];
87+
expect(await results[0]).toMatchObject(tasks[0]);
88+
expect(results[1]).rejects.toMatchInlineSnapshot(
89+
`[Error: Oh no, something went terribly wrong]`
90+
);
91+
expect(await results[2]).toMatchObject(tasks[2]);
92+
expect(await results[3]).toMatchObject(tasks[3]);
93+
});
6194
});
6295
});
6396

x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import { createBuffer, Entity, OperationError, BulkOperation } from './bulk_operation_buffer';
88
import { mapErr, asOk, asErr, Ok, Err } from './result_type';
9+
import { mockLogger } from '../test_utils';
910

1011
interface TaskInstance extends Entity {
1112
attempts: number;
@@ -227,5 +228,38 @@ describe('Bulk Operation Buffer', () => {
227228
done();
228229
});
229230
});
231+
232+
test('logs unknown bulk operation results', async (done) => {
233+
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
234+
([task1, task2, task3]) => {
235+
return Promise.resolve([
236+
incrementAttempts(task1),
237+
errorAttempts(createTask()),
238+
incrementAttempts(createTask()),
239+
]);
240+
}
241+
);
242+
243+
const logger = mockLogger();
244+
245+
const bufferedUpdate = createBuffer(bulkUpdate, { logger });
246+
247+
const task1 = createTask();
248+
const task2 = createTask();
249+
const task3 = createTask();
250+
251+
return Promise.all([
252+
expect(bufferedUpdate(task1)).resolves.toMatchObject(incrementAttempts(task1)),
253+
expect(bufferedUpdate(task2)).rejects.toMatchObject(
254+
asErr(new Error(`Unhandled buffered operation for entity: ${task2.id}`))
255+
),
256+
expect(bufferedUpdate(task3)).rejects.toMatchObject(
257+
asErr(new Error(`Unhandled buffered operation for entity: ${task3.id}`))
258+
),
259+
]).then(() => {
260+
expect(logger.warn).toHaveBeenCalledTimes(2);
261+
done();
262+
});
263+
});
230264
});
231265
});

x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66

7-
import { keyBy, map } from 'lodash';
7+
import { map } from 'lodash';
88
import { Subject, race, from } from 'rxjs';
99
import { bufferWhen, filter, bufferCount, flatMap, mapTo, first } from 'rxjs/operators';
1010
import { either, Result, asOk, asErr, Ok, Err } from './result_type';
11+
import { Logger } from '../types';
1112

1213
export interface BufferOptions {
1314
bufferMaxDuration?: number;
1415
bufferMaxOperations?: number;
16+
logger?: Logger;
1517
}
1618

1719
export interface Entity {
@@ -41,39 +43,76 @@ const FLUSH = true;
4143

4244
export function createBuffer<Input extends Entity, ErrorOutput, Output extends Entity = Input>(
4345
bulkOperation: BulkOperation<Input, ErrorOutput, Output>,
44-
{ bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE }: BufferOptions = {}
46+
{ bufferMaxDuration = 0, bufferMaxOperations = Number.MAX_VALUE, logger }: BufferOptions = {}
4547
): Operation<Input, ErrorOutput, Output> {
4648
const flushBuffer = new Subject<void>();
4749

4850
const storeUpdateBuffer = new Subject<{
4951
entity: Input;
5052
onSuccess: (entity: Ok<Output>) => void;
51-
onFailure: (error: Err<ErrorOutput>) => void;
53+
onFailure: (error: Err<ErrorOutput | Error>) => void;
5254
}>();
5355

5456
storeUpdateBuffer
5557
.pipe(
5658
bufferWhen(() => flushBuffer),
5759
filter((tasks) => tasks.length > 0)
5860
)
59-
.subscribe((entities) => {
60-
const entityById = keyBy(entities, ({ entity: { id } }) => id);
61-
bulkOperation(map(entities, 'entity'))
61+
.subscribe((bufferedEntities) => {
62+
bulkOperation(map(bufferedEntities, 'entity'))
6263
.then((results) => {
6364
results.forEach((result) =>
6465
either(
6566
result,
6667
(entity) => {
67-
entityById[entity.id].onSuccess(asOk(entity));
68+
either(
69+
pullFirstWhere(bufferedEntities, ({ entity: { id } }) => id === entity.id),
70+
({ onSuccess }) => {
71+
onSuccess(asOk(entity));
72+
},
73+
() => {
74+
if (logger) {
75+
logger.warn(
76+
`Unhandled successful Bulk Operation result: ${
77+
entity?.id ? entity.id : entity
78+
}`
79+
);
80+
}
81+
}
82+
);
6883
},
6984
({ entity, error }: OperationError<Input, ErrorOutput>) => {
70-
entityById[entity.id].onFailure(asErr(error));
85+
either(
86+
pullFirstWhere(bufferedEntities, ({ entity: { id } }) => id === entity.id),
87+
({ onFailure }) => {
88+
onFailure(asErr(error));
89+
},
90+
() => {
91+
if (logger) {
92+
logger.warn(
93+
`Unhandled failed Bulk Operation result: ${entity?.id ? entity.id : entity}`
94+
);
95+
}
96+
}
97+
);
7198
}
7299
)
73100
);
101+
102+
// if any `bufferedEntities` remain in the array then there was no result we could map to them in the bulkOperation
103+
// call their failure handler to avoid hanging the promise returned to the call site
104+
bufferedEntities.forEach((unhandledBufferedEntity) => {
105+
unhandledBufferedEntity.onFailure(
106+
asErr(
107+
new Error(
108+
`Unhandled buffered operation for entity: ${unhandledBufferedEntity.entity.id}`
109+
)
110+
)
111+
);
112+
});
74113
})
75114
.catch((ex) => {
76-
entities.forEach(({ onFailure }) => onFailure(asErr(ex)));
115+
bufferedEntities.forEach(({ onFailure }) => onFailure(asErr(ex)));
77116
});
78117
});
79118

@@ -120,3 +159,10 @@ function resolveIn(ms: number) {
120159
setTimeout(resolve, ms);
121160
});
122161
}
162+
163+
function pullFirstWhere<T>(collection: T[], predicate: (entity: T) => boolean): Result<T, void> {
164+
const indexOfFirstEntity = collection.findIndex(predicate);
165+
return indexOfFirstEntity >= 0
166+
? asOk(collection.splice(indexOfFirstEntity, 1)[0])
167+
: asErr(undefined);
168+
}

x-pack/plugins/task_manager/server/task_manager.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ export class TaskManager {
146146

147147
this.bufferedStore = new BufferedTaskStore(this.store, {
148148
bufferMaxOperations: opts.config.max_workers,
149+
logger: this.logger,
149150
});
150151

151152
this.pool = new TaskPool({
@@ -283,7 +284,7 @@ export class TaskManager {
283284
*/
284285
public async schedule(
285286
taskInstance: TaskInstanceWithDeprecatedFields,
286-
options?: object
287+
options?: Record<string, unknown>
287288
): Promise<ConcreteTaskInstance> {
288289
await this.waitUntilStarted();
289290
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
@@ -318,7 +319,7 @@ export class TaskManager {
318319
*/
319320
public async ensureScheduled(
320321
taskInstance: TaskInstanceWithId,
321-
options?: object
322+
options?: Record<string, unknown>
322323
): Promise<TaskInstanceWithId> {
323324
try {
324325
return await this.schedule(taskInstance, options);

0 commit comments

Comments
 (0)