Skip to content

Commit b763ea6

Browse files
committed
[Task Manager] time out work when it overruns in poller (#74980)
If the work performed by the poller hangs, meaning the promise fails to resolve/reject, then the poller can get stuck in a mode where it just waits for ever and no longer polls for fresh work. This PR introduces a timeout after which the poller will automatically reject the work, freeing the poller to restart pulling fresh work.
1 parent 02d92cd commit b763ea6

File tree

10 files changed

+140
-6
lines changed

10 files changed

+140
-6
lines changed

x-pack/plugins/task_manager/server/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ The task_manager can be configured via `taskManager` config options (e.g. `taskM
4141

4242
- `max_attempts` - The maximum number of times a task will be attempted before being abandoned as failed
4343
- `poll_interval` - How often the background worker should check the task_manager index for more work
44+
- `max_poll_inactivity_cycles` - How many poll intervals is work allowed to block polling for before it's timed out. This does not include task execution, as task execution does not block the polling, but rather includes work needed to manage Task Manager's state.
4445
- `index` - The name of the index that the task_manager
4546
- `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10)
4647
- `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https://github.com/elastic/dev/issues/1045) for a discussion on task scheduler security.

x-pack/plugins/task_manager/server/config.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ describe('config validation', () => {
1313
"enabled": true,
1414
"index": ".kibana_task_manager",
1515
"max_attempts": 3,
16+
"max_poll_inactivity_cycles": 10,
1617
"max_workers": 10,
1718
"poll_interval": 3000,
1819
"request_capacity": 1000,

x-pack/plugins/task_manager/server/config.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { schema, TypeOf } from '@kbn/config-schema';
88

99
export const DEFAULT_MAX_WORKERS = 10;
1010
export const DEFAULT_POLL_INTERVAL = 3000;
11+
export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10;
1112

1213
export const configSchema = schema.object({
1314
enabled: schema.boolean({ defaultValue: true }),
@@ -21,6 +22,11 @@ export const configSchema = schema.object({
2122
defaultValue: DEFAULT_POLL_INTERVAL,
2223
min: 100,
2324
}),
25+
/* How many poll interval cycles can work take before it's timed out. */
26+
max_poll_inactivity_cycles: schema.number({
27+
defaultValue: DEFAULT_MAX_POLL_INACTIVITY_CYCLES,
28+
min: 1,
29+
}),
2430
/* How many requests can Task Manager buffer before it rejects new requests. */
2531
request_capacity: schema.number({
2632
// a nice round contrived number, feel free to change as we learn how it behaves
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
import { timeoutPromiseAfter } from './timeout_promise_after';
8+
9+
const delay = (ms: number, result: unknown) =>
10+
new Promise((resolve) => setTimeout(() => resolve(result), ms));
11+
12+
const delayRejection = (ms: number, result: unknown) =>
13+
new Promise((resolve, reject) => setTimeout(() => reject(result), ms));
14+
15+
describe('Promise Timeout', () => {
16+
test('resolves when wrapped promise resolves', async () => {
17+
return expect(
18+
timeoutPromiseAfter(delay(100, 'OK'), 1000, () => 'TIMEOUT ERR')
19+
).resolves.toMatchInlineSnapshot(`"OK"`);
20+
});
21+
22+
test('reject when wrapped promise rejects', async () => {
23+
return expect(
24+
timeoutPromiseAfter(delayRejection(100, 'ERR'), 1000, () => 'TIMEOUT ERR')
25+
).rejects.toMatchInlineSnapshot(`"ERR"`);
26+
});
27+
28+
test('reject it the timeout elapses', async () => {
29+
return expect(
30+
timeoutPromiseAfter(delay(1000, 'OK'), 100, () => 'TIMEOUT ERR')
31+
).rejects.toMatchInlineSnapshot(`"TIMEOUT ERR"`);
32+
});
33+
});
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
export function timeoutPromiseAfter<T, G>(
8+
future: Promise<T>,
9+
ms: number,
10+
onTimeout: () => G
11+
): Promise<T> {
12+
return new Promise((resolve, reject) => {
13+
setTimeout(() => reject(onTimeout()), ms);
14+
future.then(resolve).catch(reject);
15+
});
16+
}

x-pack/plugins/task_manager/server/task_manager.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ describe('TaskManager', () => {
4040
index: 'foo',
4141
max_attempts: 9,
4242
poll_interval: 6000000,
43+
max_poll_inactivity_cycles: 10,
4344
request_capacity: 1000,
4445
};
4546
const taskManagerOpts = {

x-pack/plugins/task_manager/server/task_manager.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ export class TaskManager {
160160
getCapacity: () => this.pool.availableWorkers,
161161
pollRequests$: this.claimRequests$,
162162
work: this.pollForWork,
163+
// Time out the `work` phase if it takes longer than a certain number of polling cycles
164+
// The `work` phase includes the prework needed *before* executing a task
165+
// (such as polling for new work, marking tasks as running etc.) but does not
166+
// include the time of actually running the task
167+
workTimeout: opts.config.poll_interval * opts.config.max_poll_inactivity_cycles,
163168
});
164169
}
165170

x-pack/plugins/task_manager/server/task_poller.test.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { Subject } from 'rxjs';
99
import { Option, none, some } from 'fp-ts/lib/Option';
1010
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
1111
import { fakeSchedulers } from 'rxjs-marbles/jest';
12-
import { sleep, resolvable } from './test_utils';
12+
import { sleep, resolvable, Resolvable } from './test_utils';
1313
import { asOk, asErr } from './lib/result_type';
1414

1515
describe('TaskPoller', () => {
@@ -243,6 +243,7 @@ describe('TaskPoller', () => {
243243
},
244244
getCapacity: () => 5,
245245
pollRequests$,
246+
workTimeout: pollInterval * 5,
246247
}).subscribe(handler);
247248

248249
pollRequests$.next(some('one'));
@@ -272,6 +273,68 @@ describe('TaskPoller', () => {
272273
})
273274
);
274275

276+
test(
277+
'work times out whe nit exceeds a predefined amount of time',
278+
fakeSchedulers(async (advance) => {
279+
const pollInterval = 100;
280+
const workTimeout = pollInterval * 2;
281+
const bufferCapacity = 2;
282+
283+
const handler = jest.fn();
284+
285+
type ResolvableTupple = [string, PromiseLike<void> & Resolvable];
286+
const pollRequests$ = new Subject<Option<ResolvableTupple>>();
287+
createTaskPoller<[string, Resolvable], string[]>({
288+
pollInterval,
289+
bufferCapacity,
290+
work: async (...resolvables) => {
291+
await Promise.all(resolvables.map(([, future]) => future));
292+
return resolvables.map(([name]) => name);
293+
},
294+
getCapacity: () => 5,
295+
pollRequests$,
296+
workTimeout,
297+
}).subscribe(handler);
298+
299+
const one: ResolvableTupple = ['one', resolvable()];
300+
pollRequests$.next(some(one));
301+
302+
// split these into two payloads
303+
advance(pollInterval);
304+
305+
const two: ResolvableTupple = ['two', resolvable()];
306+
const three: ResolvableTupple = ['three', resolvable()];
307+
pollRequests$.next(some(two));
308+
pollRequests$.next(some(three));
309+
310+
advance(workTimeout);
311+
await sleep(workTimeout);
312+
313+
// one resolves too late!
314+
one[1].resolve();
315+
316+
expect(handler).toHaveBeenCalledWith(
317+
asErr(
318+
new PollingError<string>(
319+
'Failed to poll for work: Error: work has timed out',
320+
PollingErrorType.WorkError,
321+
none
322+
)
323+
)
324+
);
325+
expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError);
326+
327+
// two and three in time
328+
two[1].resolve();
329+
three[1].resolve();
330+
331+
advance(pollInterval);
332+
await sleep(pollInterval);
333+
334+
expect(handler).toHaveBeenCalledWith(asOk(['two', 'three']));
335+
})
336+
);
337+
275338
test(
276339
'returns an error when polling for work fails',
277340
fakeSchedulers(async (advance) => {

x-pack/plugins/task_manager/server/task_poller.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import {
2525
asErr,
2626
promiseResult,
2727
} from './lib/result_type';
28+
import { timeoutPromiseAfter } from './lib/timeout_promise_after';
2829

2930
type WorkFn<T, H> = (...params: T[]) => Promise<H>;
3031

@@ -34,6 +35,7 @@ interface Opts<T, H> {
3435
getCapacity: () => number;
3536
pollRequests$: Observable<Option<T>>;
3637
work: WorkFn<T, H>;
38+
workTimeout?: number;
3739
}
3840

3941
/**
@@ -55,6 +57,7 @@ export function createTaskPoller<T, H>({
5557
pollRequests$,
5658
bufferCapacity,
5759
work,
60+
workTimeout,
5861
}: Opts<T, H>): Observable<Result<H, PollingError<T>>> {
5962
const hasCapacity = () => getCapacity() > 0;
6063

@@ -89,11 +92,15 @@ export function createTaskPoller<T, H>({
8992
concatMap(async (set: Set<T>) => {
9093
closeSleepPerf();
9194
return mapResult<H, Error, Result<H, PollingError<T>>>(
92-
await promiseResult<H, Error>(work(...pullFromSet(set, getCapacity()))),
95+
await promiseResult<H, Error>(
96+
timeoutPromiseAfter<H, Error>(
97+
work(...pullFromSet(set, getCapacity())),
98+
workTimeout ?? pollInterval,
99+
() => new Error(`work has timed out`)
100+
)
101+
),
93102
(workResult) => asOk(workResult),
94-
(err: Error) => {
95-
return asPollingError<T>(err, PollingErrorType.WorkError);
96-
}
103+
(err: Error) => asPollingError<T>(err, PollingErrorType.WorkError)
97104
);
98105
}),
99106
tap(openSleepPerf),
@@ -129,6 +136,7 @@ function pushOptionalIntoSet<T>(
129136

130137
export enum PollingErrorType {
131138
WorkError,
139+
WorkTimeout,
132140
RequestCapacityReached,
133141
}
134142

x-pack/plugins/task_manager/server/test_utils/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export function mockLogger() {
2323
};
2424
}
2525

26-
interface Resolvable {
26+
export interface Resolvable {
2727
resolve: () => void;
2828
}
2929

0 commit comments

Comments
 (0)