Skip to content

Commit 7bd057e

Browse files
committed
[Task Manager] Batches the update operations in Task Manager (elastic#71470)
This PR attempts to batch update tasks in Task Manager in order to avoid overloading the Elasticsearch queue. This is the 1st PR addressing elastic#65551 Under the hood we now use a Reactive buffer accumulates all calls to the `update` api in the TaskStore and flushes after 50ms or when as many operations as there are workers have been buffered (whichever comes first).
1 parent 1df44e5 commit 7bd057e

File tree

10 files changed

+685
-7
lines changed

10 files changed

+685
-7
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
import uuid from 'uuid';
8+
import { taskStoreMock } from './task_store.mock';
9+
import { BufferedTaskStore } from './buffered_task_store';
10+
import { asErr, asOk } from './lib/result_type';
11+
import { TaskStatus } from './task';
12+
13+
describe('Buffered Task Store', () => {
14+
test('proxies the TaskStore for `maxAttempts` and `remove`', async () => {
15+
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
16+
taskStore.bulkUpdate.mockResolvedValue([]);
17+
const bufferedStore = new BufferedTaskStore(taskStore, {});
18+
19+
expect(bufferedStore.maxAttempts).toEqual(10);
20+
21+
bufferedStore.remove('1');
22+
expect(taskStore.remove).toHaveBeenCalledWith('1');
23+
});
24+
25+
describe('update', () => {
26+
test("proxies the TaskStore's `bulkUpdate`", async () => {
27+
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
28+
const bufferedStore = new BufferedTaskStore(taskStore, {});
29+
30+
const task = mockTask();
31+
32+
taskStore.bulkUpdate.mockResolvedValue([asOk(task)]);
33+
34+
expect(await bufferedStore.update(task)).toMatchObject(task);
35+
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task]);
36+
});
37+
38+
test('handles partially successfull bulkUpdates resolving each call appropriately', async () => {
39+
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
40+
const bufferedStore = new BufferedTaskStore(taskStore, {});
41+
42+
const tasks = [mockTask(), mockTask(), mockTask()];
43+
44+
taskStore.bulkUpdate.mockResolvedValueOnce([
45+
asOk(tasks[0]),
46+
asErr({ entity: tasks[1], error: new Error('Oh no, something went terribly wrong') }),
47+
asOk(tasks[2]),
48+
]);
49+
50+
const results = [
51+
bufferedStore.update(tasks[0]),
52+
bufferedStore.update(tasks[1]),
53+
bufferedStore.update(tasks[2]),
54+
];
55+
expect(await results[0]).toMatchObject(tasks[0]);
56+
expect(results[1]).rejects.toMatchInlineSnapshot(
57+
`[Error: Oh no, something went terribly wrong]`
58+
);
59+
expect(await results[2]).toMatchObject(tasks[2]);
60+
});
61+
});
62+
});
63+
64+
function mockTask() {
65+
return {
66+
id: `task_${uuid.v4()}`,
67+
attempts: 0,
68+
schedule: undefined,
69+
params: { hello: 'world' },
70+
retryAt: null,
71+
runAt: new Date(),
72+
scheduledAt: new Date(),
73+
scope: undefined,
74+
startedAt: null,
75+
state: { foo: 'bar' },
76+
status: TaskStatus.Idle,
77+
taskType: 'report',
78+
user: undefined,
79+
version: '123',
80+
ownerId: '123',
81+
};
82+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
import { TaskStore } from './task_store';
8+
import { ConcreteTaskInstance } from './task';
9+
import { Updatable } from './task_runner';
10+
import { createBuffer, Operation, BufferOptions } from './lib/bulk_operation_buffer';
11+
import { unwrapPromise } from './lib/result_type';
12+
13+
// by default allow updates to be buffered for up to 50ms
14+
const DEFAULT_BUFFER_MAX_DURATION = 50;
15+
16+
export class BufferedTaskStore implements Updatable {
17+
private bufferedUpdate: Operation<ConcreteTaskInstance, Error>;
18+
constructor(private readonly taskStore: TaskStore, options: BufferOptions) {
19+
this.bufferedUpdate = createBuffer<ConcreteTaskInstance, Error>(
20+
(docs) => taskStore.bulkUpdate(docs),
21+
{
22+
bufferMaxDuration: DEFAULT_BUFFER_MAX_DURATION,
23+
...options,
24+
}
25+
);
26+
}
27+
28+
public get maxAttempts(): number {
29+
return this.taskStore.maxAttempts;
30+
}
31+
32+
public async update(doc: ConcreteTaskInstance): Promise<ConcreteTaskInstance> {
33+
return unwrapPromise(this.bufferedUpdate(doc));
34+
}
35+
36+
public async remove(id: string): Promise<void> {
37+
return this.taskStore.remove(id);
38+
}
39+
}

0 commit comments

Comments
 (0)