Skip to content

Commit dcc2312

Browse files
committed
added missing unit tests
1 parent 19246f5 commit dcc2312

File tree

6 files changed

+179
-41
lines changed

6 files changed

+179
-41
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
import uuid from 'uuid';
8+
import { taskStoreMock } from './task_store.mock';
9+
import { BufferedTaskStore } from './buffered_task_store';
10+
import { asErr, asOk } from './lib/result_type';
11+
import { TaskStatus } from './task';
12+
13+
describe('Buffered Task Store', () => {
14+
test('proxies the TaskStore for `maxAttempts` and `remove`', async () => {
15+
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
16+
taskStore.bulkUpdate.mockResolvedValue([]);
17+
const bufferedStore = new BufferedTaskStore(taskStore);
18+
19+
expect(bufferedStore.maxAttempts).toEqual(10);
20+
21+
bufferedStore.remove('1');
22+
expect(taskStore.remove).toHaveBeenCalledWith('1');
23+
});
24+
25+
describe('update', () => {
26+
test("proxies the TaskStore's `bulkUpdate`", async () => {
27+
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
28+
const bufferedStore = new BufferedTaskStore(taskStore);
29+
30+
const task = mockTask();
31+
32+
taskStore.bulkUpdate.mockResolvedValue([asOk(task)]);
33+
34+
expect(await bufferedStore.update(task)).toMatchObject(task);
35+
expect(taskStore.bulkUpdate).toHaveBeenCalledWith([task]);
36+
});
37+
38+
test('handles partially successfull bulkUpdates resolving each call appropriately', async () => {
39+
const taskStore = taskStoreMock.create({ maxAttempts: 10 });
40+
const bufferedStore = new BufferedTaskStore(taskStore);
41+
42+
const tasks = [mockTask(), mockTask(), mockTask()];
43+
44+
taskStore.bulkUpdate.mockResolvedValueOnce([
45+
asOk(tasks[0]),
46+
asErr(new Error('Oh no, something went terribly wrong')),
47+
asOk(tasks[2]),
48+
]);
49+
50+
const results = [
51+
bufferedStore.update(tasks[0]),
52+
bufferedStore.update(tasks[1]),
53+
bufferedStore.update(tasks[2]),
54+
];
55+
expect(await results[0]).toMatchObject(tasks[0]);
56+
expect(results[1]).rejects.toMatchInlineSnapshot(
57+
`[Error: Oh no, something went terribly wrong]`
58+
);
59+
expect(await results[2]).toMatchObject(tasks[2]);
60+
});
61+
});
62+
});
63+
64+
function mockTask() {
65+
return {
66+
id: `task_${uuid.v4()}`,
67+
attempts: 0,
68+
schedule: undefined,
69+
params: { hello: 'world' },
70+
retryAt: null,
71+
runAt: new Date(),
72+
scheduledAt: new Date(),
73+
scope: undefined,
74+
startedAt: null,
75+
state: { foo: 'bar' },
76+
status: TaskStatus.Idle,
77+
taskType: 'report',
78+
user: undefined,
79+
version: '123',
80+
ownerId: '123',
81+
};
82+
}

x-pack/plugins/task_manager/server/buffered_task_store.ts

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,27 @@ import { TaskStore } from './task_store';
88
import { ConcreteTaskInstance } from './task';
99
import { Updatable } from './task_runner';
1010
import { createBuffer, Operation } from './lib/bulk_operation_buffer';
11-
import { unwrapPromise, mapErr } from './lib/result_type';
11+
import { unwrapPromise, asErr, mapErr } from './lib/result_type';
1212

1313
export class BufferedTaskStore implements Updatable {
14-
private bufferedUpdate: Operation<ConcreteTaskInstance, Error>;
14+
private bufferedUpdate: Operation<ConcreteTaskInstance, ConcreteTaskInstance, Error>;
1515
constructor(private readonly taskStore: TaskStore) {
16-
this.bufferedUpdate = createBuffer<ConcreteTaskInstance, Error>(async (docs) => {
17-
return (await taskStore.bulkUpdate(docs)).map((entityOrError, index) =>
18-
mapErr(
19-
(error: Error) => ({
20-
entity: docs[index],
21-
error,
22-
}),
23-
entityOrError
24-
)
25-
);
26-
});
16+
this.bufferedUpdate = createBuffer<ConcreteTaskInstance, ConcreteTaskInstance, Error>(
17+
async (docs) => {
18+
return (await taskStore.bulkUpdate(docs)).map((entityOrError, index) =>
19+
mapErr(
20+
(error: Error) =>
21+
asErr({
22+
// TaskStore's bulkUpdate maintains the order of the docs
23+
// so we can rely on the index in the `docs` to match an entity with an index
24+
entity: docs[index],
25+
error,
26+
}),
27+
entityOrError
28+
)
29+
);
30+
}
31+
);
2732
}
2833

2934
public get maxAttempts(): number {

x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.test.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ function errorAttempts(task: TaskInstance): Err<OperationError<TaskInstance, Err
3333
});
3434
}
3535

36-
describe('Task Store Buffer', () => {
36+
describe('Bulk Operation Buffer', () => {
3737
describe('createBuffer()', () => {
3838
test('batches up multiple Operation calls', async () => {
39-
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
39+
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, TaskInstance, Error>> = jest.fn(
4040
([task1, task2]) => {
4141
return Promise.resolve([incrementAttempts(task1), incrementAttempts(task2)]);
4242
}
@@ -55,9 +55,11 @@ describe('Task Store Buffer', () => {
5555
});
5656

5757
test('batch updates are executed at most by the next Event Loop tick', async () => {
58-
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn((tasks) => {
59-
return Promise.resolve(tasks.map(incrementAttempts));
60-
});
58+
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, TaskInstance, Error>> = jest.fn(
59+
(tasks) => {
60+
return Promise.resolve(tasks.map(incrementAttempts));
61+
}
62+
);
6163

6264
const bufferedUpdate = createBuffer(bulkUpdate);
6365

@@ -97,7 +99,7 @@ describe('Task Store Buffer', () => {
9799
});
98100

99101
test('handles both resolutions and rejections at individual task level', async (done) => {
100-
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(
102+
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, TaskInstance, Error>> = jest.fn(
101103
([task1, task2, task3]) => {
102104
return Promise.resolve([
103105
incrementAttempts(task1),
@@ -129,9 +131,11 @@ describe('Task Store Buffer', () => {
129131
});
130132

131133
test('handles bulkUpdate failure', async (done) => {
132-
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, Error>> = jest.fn(() => {
133-
return Promise.reject(new Error('bulkUpdate is an illusion'));
134-
});
134+
const bulkUpdate: jest.Mocked<BulkOperation<TaskInstance, TaskInstance, Error>> = jest.fn(
135+
() => {
136+
return Promise.reject(new Error('bulkUpdate is an illusion'));
137+
}
138+
);
135139

136140
const bufferedUpdate = createBuffer(bulkUpdate);
137141

x-pack/plugins/task_manager/server/lib/bulk_operation_buffer.ts

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,31 @@ export interface Entity {
1313
id: string;
1414
}
1515

16-
export interface OperationError<H, E> {
17-
entity: H;
18-
error: E;
16+
export interface OperationError<Input, ErrorOutput> {
17+
entity: Input;
18+
error: ErrorOutput;
1919
}
2020

21-
export type OperationResult<H, E> = Result<H, OperationError<H, E>>;
21+
export type OperationResult<Input, Output, ErrorOutput> = Result<
22+
Output,
23+
OperationError<Input, ErrorOutput>
24+
>;
2225

23-
export type Operation<H, E> = (entity: H) => Promise<Result<H, E>>;
24-
export type BulkOperation<H, E> = (entities: H[]) => Promise<Array<OperationResult<H, E>>>;
26+
export type Operation<Input, Output, ErrorOutput> = (
27+
entity: Input
28+
) => Promise<Result<Output, ErrorOutput>>;
29+
export type BulkOperation<Input, Output, ErrorOutput> = (
30+
entities: Input[]
31+
) => Promise<Array<OperationResult<Input, Output, ErrorOutput>>>;
2532

26-
export function createBuffer<H extends Entity, E>(
27-
bulkOperation: BulkOperation<H, E>
28-
): Operation<H, E> {
33+
export function createBuffer<Input extends Entity, Output extends Entity, ErrorOutput>(
34+
bulkOperation: BulkOperation<Input, Output, ErrorOutput>
35+
): Operation<Input, Output, ErrorOutput> {
2936
const flushBuffer = new Subject<void>();
3037
const storeUpdateBuffer = new Subject<{
31-
entity: H;
32-
onSuccess: (entity: Ok<H>) => void;
33-
onFailure: (error: Err<E>) => void;
38+
entity: Input;
39+
onSuccess: (entity: Ok<Output>) => void;
40+
onFailure: (error: Err<ErrorOutput>) => void;
3441
}>();
3542

3643
storeUpdateBuffer
@@ -48,7 +55,7 @@ export function createBuffer<H extends Entity, E>(
4855
(entity) => {
4956
entityById[entity.id].onSuccess(asOk(entity));
5057
},
51-
({ entity, error }: OperationError<H, E>) => {
58+
({ entity, error }: OperationError<Input, ErrorOutput>) => {
5259
entityById[entity.id].onFailure(asErr(error));
5360
}
5461
)
@@ -59,7 +66,7 @@ export function createBuffer<H extends Entity, E>(
5966
});
6067
});
6168

62-
return async function (entity: H) {
69+
return async function (entity: Input) {
6370
return new Promise((resolve, reject) => {
6471
// ensure we flush by the end of the "current" event loop tick
6572
setImmediate(() => flushBuffer.next());

x-pack/plugins/task_manager/server/lib/result_type.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,34 @@ export function asErr<T>(error: T): Err<T> {
3131
};
3232
}
3333

34+
export function isResult<T, E>(maybeResult: unknown): maybeResult is Result<T, E> {
35+
return (
36+
(maybeResult as Result<T, E>)?.tag === 'ok' || (maybeResult as Result<T, E>)?.tag === 'err'
37+
);
38+
}
39+
3440
export function isOk<T, E>(result: Result<T, E>): result is Ok<T> {
35-
return result.tag === 'ok';
41+
return result?.tag === 'ok';
3642
}
3743

3844
export function isErr<T, E>(result: Result<T, E>): result is Err<E> {
3945
return !isOk(result);
4046
}
4147

42-
export async function promiseResult<T, E>(future: Promise<T>): Promise<Result<T, E>> {
48+
export async function promiseResult<T, E>(
49+
future: Promise<T | Result<T, E>>
50+
): Promise<Result<T, E>> {
4351
try {
44-
return asOk(await future);
52+
const result = await future;
53+
return isResult(result) ? result : asOk(result);
4554
} catch (e) {
46-
return asErr(e);
55+
return isResult<T, E>(e) ? e : asErr(e);
4756
}
4857
}
4958

5059
export async function unwrapPromise<T, E>(future: Promise<Result<T, E>>): Promise<T> {
5160
return map(
52-
await future,
61+
await promiseResult(future),
5362
(value: T) => Promise.resolve(value),
5463
(err: E) => Promise.reject(err)
5564
);
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
import { TaskStore } from './task_store';
8+
9+
interface TaskStoreOptions {
10+
maxAttempts?: number;
11+
index?: string;
12+
taskManagerId?: string;
13+
}
14+
export const taskStoreMock = {
15+
create({ maxAttempts = 0, index = '', taskManagerId = '' }: TaskStoreOptions) {
16+
const mocked = ({
17+
update: jest.fn(),
18+
remove: jest.fn(),
19+
schedule: jest.fn(),
20+
claimAvailableTasks: jest.fn(),
21+
bulkUpdate: jest.fn(),
22+
get: jest.fn(),
23+
getLifecycle: jest.fn(),
24+
fetch: jest.fn(),
25+
maxAttempts,
26+
index,
27+
taskManagerId,
28+
} as unknown) as jest.Mocked<TaskStore>;
29+
return mocked;
30+
},
31+
};

0 commit comments

Comments
 (0)