Skip to content

Commit f4ab5ed

Browse files
committed
time out if work overruns in poller
1 parent cdc7d25 commit f4ab5ed

File tree

8 files changed

+138
-6
lines changed

8 files changed

+138
-6
lines changed

x-pack/plugins/task_manager/server/config.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ describe('config validation', () => {
1313
"enabled": true,
1414
"index": ".kibana_task_manager",
1515
"max_attempts": 3,
16+
"max_poll_inactivity_cycles": 10,
1617
"max_workers": 10,
1718
"poll_interval": 3000,
1819
"request_capacity": 1000,

x-pack/plugins/task_manager/server/config.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { schema, TypeOf } from '@kbn/config-schema';
88

99
export const DEFAULT_MAX_WORKERS = 10;
1010
export const DEFAULT_POLL_INTERVAL = 3000;
11+
export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10;
1112

1213
export const configSchema = schema.object({
1314
enabled: schema.boolean({ defaultValue: true }),
@@ -21,6 +22,11 @@ export const configSchema = schema.object({
2122
defaultValue: DEFAULT_POLL_INTERVAL,
2223
min: 100,
2324
}),
25+
/* How many poll interval cycles can work take before it's timed out. */
26+
max_poll_inactivity_cycles: schema.number({
27+
defaultValue: DEFAULT_MAX_POLL_INACTIVITY_CYCLES,
28+
min: 1,
29+
}),
2430
/* How many requests can Task Manager buffer before it rejects new requests. */
2531
request_capacity: schema.number({
2632
// a nice round contrived number, feel free to change as we learn how it behaves
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
import { timeoutPromiseAfter } from './timeout_promise_after';
8+
9+
const delay = (ms: number, result: unknown) =>
10+
new Promise((resolve) => setTimeout(() => resolve(result), ms));
11+
12+
const delayRejection = (ms: number, result: unknown) =>
13+
new Promise((resolve, reject) => setTimeout(() => reject(result), ms));
14+
15+
describe('Promise Timeout', () => {
16+
test('resolves when wrapped promise resolves', async () => {
17+
return expect(timeoutPromiseAfter(delay(100, 'OK'), 1000)).resolves.toMatchInlineSnapshot(
18+
`"OK"`
19+
);
20+
});
21+
22+
test('reject when wrapped promise rejects', async () => {
23+
return expect(
24+
timeoutPromiseAfter(delayRejection(100, 'ERR'), 1000)
25+
).rejects.toMatchInlineSnapshot(`"ERR"`);
26+
});
27+
28+
test('reject it the timeout elapses', async () => {
29+
return expect(
30+
timeoutPromiseAfter(delay(1000, 'OK'), 100, () => 'ERR')
31+
).rejects.toMatchInlineSnapshot(`"ERR"`);
32+
});
33+
});
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
export function timeoutPromiseAfter<T, G>(
8+
future: Promise<T>,
9+
ms: number,
10+
onTimeout: () => G
11+
): Promise<T> {
12+
return new Promise((resolve, reject) => {
13+
setTimeout(() => reject(onTimeout()), ms);
14+
future.then(resolve).catch(reject);
15+
});
16+
}

x-pack/plugins/task_manager/server/task_manager.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@ export class TaskManager {
159159
getCapacity: () => this.pool.availableWorkers,
160160
pollRequests$: this.claimRequests$,
161161
work: this.pollForWork,
162+
// Time out the `work` phase if it takes longer than a certain number of polling cycles
163+
// The `work` phase includes the prework needed *before* executing a task
164+
// (such as polling for new work, marking tasks as running etc.) but does not
165+
// include the time of actually running the task
166+
workTimeout: opts.config.poll_interval * opts.config.max_poll_inactivity_cycles,
162167
});
163168
}
164169

x-pack/plugins/task_manager/server/task_poller.test.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { Subject } from 'rxjs';
99
import { Option, none, some } from 'fp-ts/lib/Option';
1010
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
1111
import { fakeSchedulers } from 'rxjs-marbles/jest';
12-
import { sleep, resolvable } from './test_utils';
12+
import { sleep, resolvable, Resolvable } from './test_utils';
1313
import { asOk, asErr } from './lib/result_type';
1414

1515
describe('TaskPoller', () => {
@@ -243,6 +243,7 @@ describe('TaskPoller', () => {
243243
},
244244
getCapacity: () => 5,
245245
pollRequests$,
246+
workTimeout: pollInterval * 5,
246247
}).subscribe(handler);
247248

248249
pollRequests$.next(some('one'));
@@ -272,6 +273,68 @@ describe('TaskPoller', () => {
272273
})
273274
);
274275

276+
test(
277+
'work times out whe nit exceeds a predefined amount of time',
278+
fakeSchedulers(async (advance) => {
279+
const pollInterval = 100;
280+
const workTimeout = pollInterval * 2;
281+
const bufferCapacity = 2;
282+
283+
const handler = jest.fn();
284+
285+
type ResolvableTupple = [string, PromiseLike<void> & Resolvable];
286+
const pollRequests$ = new Subject<Option<ResolvableTupple>>();
287+
createTaskPoller<[string, Resolvable], string[]>({
288+
pollInterval,
289+
bufferCapacity,
290+
work: async (...resolvables) => {
291+
await Promise.all(resolvables.map(([, future]) => future));
292+
return resolvables.map(([name]) => name);
293+
},
294+
getCapacity: () => 5,
295+
pollRequests$,
296+
workTimeout,
297+
}).subscribe(handler);
298+
299+
const one: ResolvableTupple = ['one', resolvable()];
300+
pollRequests$.next(some(one));
301+
302+
// split these into two payloads
303+
advance(pollInterval);
304+
305+
const two: ResolvableTupple = ['two', resolvable()];
306+
const three: ResolvableTupple = ['three', resolvable()];
307+
pollRequests$.next(some(two));
308+
pollRequests$.next(some(three));
309+
310+
advance(workTimeout);
311+
await sleep(workTimeout);
312+
313+
// one resolves too late!
314+
one[1].resolve();
315+
316+
expect(handler).toHaveBeenCalledWith(
317+
asErr(
318+
new PollingError<string>(
319+
'Failed to poll for work: Error: work has timed out',
320+
PollingErrorType.WorkError,
321+
none
322+
)
323+
)
324+
);
325+
expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError);
326+
327+
// two and three in time
328+
two[1].resolve();
329+
three[1].resolve();
330+
331+
advance(pollInterval);
332+
await sleep(pollInterval);
333+
334+
expect(handler).toHaveBeenCalledWith(asOk(['two', 'three']));
335+
})
336+
);
337+
275338
test(
276339
'returns an error when polling for work fails',
277340
fakeSchedulers(async (advance) => {

x-pack/plugins/task_manager/server/task_poller.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
asErr,
2626
promiseResult,
2727
} from './lib/result_type';
28+
import { timeoutPromiseAfter } from './lib/timeout_promise_after';
2829

2930
type WorkFn<T, H> = (...params: T[]) => Promise<H>;
3031

@@ -34,6 +35,7 @@ interface Opts<T, H> {
3435
getCapacity: () => number;
3536
pollRequests$: Observable<Option<T>>;
3637
work: WorkFn<T, H>;
38+
workTimeout?: number;
3739
}
3840

3941
/**
@@ -55,6 +57,7 @@ export function createTaskPoller<T, H>({
5557
pollRequests$,
5658
bufferCapacity,
5759
work,
60+
workTimeout,
5861
}: Opts<T, H>): Observable<Result<H, PollingError<T>>> {
5962
const hasCapacity = () => getCapacity() > 0;
6063

@@ -89,11 +92,15 @@ export function createTaskPoller<T, H>({
8992
concatMap(async (set: Set<T>) => {
9093
closeSleepPerf();
9194
return mapResult<H, Error, Result<H, PollingError<T>>>(
92-
await promiseResult<H, Error>(work(...pullFromSet(set, getCapacity()))),
95+
await promiseResult<H, Error>(
96+
timeoutPromiseAfter<H, Error>(
97+
work(...pullFromSet(set, getCapacity())),
98+
workTimeout ?? pollInterval,
99+
() => new Error(`work has timed out`)
100+
)
101+
),
93102
(workResult) => asOk(workResult),
94-
(err: Error) => {
95-
return asPollingError<T>(err, PollingErrorType.WorkError);
96-
}
103+
(err: Error) => asPollingError<T>(err, PollingErrorType.WorkError)
97104
);
98105
}),
99106
tap(openSleepPerf),
@@ -129,6 +136,7 @@ function pushOptionalIntoSet<T>(
129136

130137
export enum PollingErrorType {
131138
WorkError,
139+
WorkTimeout,
132140
RequestCapacityReached,
133141
}
134142

x-pack/plugins/task_manager/server/test_utils/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export function mockLogger() {
2323
};
2424
}
2525

26-
interface Resolvable {
26+
export interface Resolvable {
2727
resolve: () => void;
2828
}
2929

0 commit comments

Comments
 (0)