Skip to content

Commit 3b0215c

Browse files
authored
[Task Manager] Ensures retries are inferred from the schedule of recurring tasks (#83682)
This addresses a bug in Task Manager in the task timeout behaviour. When a recurring task's `retryAt` field is set (which happens at task run), it is currently scheduled to the task definition's `timeout` value, but the original intention was for these tasks to retry on their next scheduled run (originally identified as part of #39349). In this PR we ensure recurring task retries are scheduled according to their recurring schedule, rather than the default `timeout` of the task type.
1 parent 4009edc commit 3b0215c

File tree

6 files changed

+190
-11
lines changed

6 files changed

+190
-11
lines changed

x-pack/plugins/task_manager/server/lib/intervals.test.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
secondsFromNow,
1515
secondsFromDate,
1616
asInterval,
17+
maxIntervalFromDate,
1718
} from './intervals';
1819

1920
let fakeTimer: sinon.SinonFakeTimers;
@@ -159,6 +160,44 @@ describe('taskIntervals', () => {
159160
});
160161
});
161162

163+
describe('maxIntervalFromDate', () => {
164+
test('it handles a single interval', () => {
165+
const mins = _.random(1, 100);
166+
const now = new Date();
167+
const expected = now.getTime() + mins * 60 * 1000;
168+
expect(maxIntervalFromDate(now, `${mins}m`)!.getTime()).toEqual(expected);
169+
});
170+
171+
test('it handles multiple intervals', () => {
172+
const mins = _.random(1, 100);
173+
const maxMins = mins + _.random(1, 100);
174+
const now = new Date();
175+
const expected = now.getTime() + maxMins * 60 * 1000;
176+
expect(maxIntervalFromDate(now, `${mins}m`, `${maxMins}m`)!.getTime()).toEqual(expected);
177+
});
178+
179+
test('it handles multiple mixed type intervals', () => {
180+
const mins = _.random(1, 100);
181+
const seconds = _.random(1, 100);
182+
const maxSeconds = Math.max(mins * 60, seconds) + _.random(1, 100);
183+
const now = new Date();
184+
const expected = now.getTime() + maxSeconds * 1000;
185+
expect(
186+
maxIntervalFromDate(now, `${mins}m`, `${maxSeconds}s`, `${seconds}s`)!.getTime()
187+
).toEqual(expected);
188+
});
189+
190+
test('it handles undefined intervals', () => {
191+
const mins = _.random(1, 100);
192+
const maxMins = mins + _.random(1, 100);
193+
const now = new Date();
194+
const expected = now.getTime() + maxMins * 60 * 1000;
195+
expect(maxIntervalFromDate(now, `${mins}m`, undefined, `${maxMins}m`)!.getTime()).toEqual(
196+
expected
197+
);
198+
});
199+
});
200+
162201
describe('intervalFromDate', () => {
163202
test('it returns the given date plus n minutes', () => {
164203
const originalDate = new Date(2019, 1, 1);

x-pack/plugins/task_manager/server/lib/intervals.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66

7-
import { memoize } from 'lodash';
7+
import { isString, memoize } from 'lodash';
88

99
export enum IntervalCadence {
1010
Minute = 'm',
@@ -57,6 +57,16 @@ export function intervalFromDate(date: Date, interval?: string): Date | undefine
5757
return secondsFromDate(date, parseIntervalAsSecond(interval));
5858
}
5959

60+
export function maxIntervalFromDate(
61+
date: Date,
62+
...intervals: Array<string | undefined>
63+
): Date | undefined {
64+
const maxSeconds = Math.max(...intervals.filter(isString).map(parseIntervalAsSecond));
65+
if (!isNaN(maxSeconds)) {
66+
return secondsFromDate(date, maxSeconds);
67+
}
68+
}
69+
6070
/**
6171
* Returns a date that is secs seconds from now.
6272
*

x-pack/plugins/task_manager/server/task_running/task_runner.test.ts

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,102 @@ describe('TaskManagerRunner', () => {
393393
);
394394
});
395395

396+
test('calculates retryAt by schedule when running a recurring task', async () => {
397+
const intervalMinutes = 10;
398+
const id = _.random(1, 20).toString();
399+
const initialAttempts = _.random(0, 2);
400+
const { runner, store } = testOpts({
401+
instance: {
402+
id,
403+
attempts: initialAttempts,
404+
schedule: {
405+
interval: `${intervalMinutes}m`,
406+
},
407+
},
408+
definitions: {
409+
bar: {
410+
title: 'Bar!',
411+
createTaskRunner: () => ({
412+
run: async () => undefined,
413+
}),
414+
},
415+
},
416+
});
417+
418+
await runner.markTaskAsRunning();
419+
420+
sinon.assert.calledOnce(store.update);
421+
const instance = store.update.args[0][0];
422+
423+
expect(instance.retryAt.getTime()).toEqual(
424+
instance.startedAt.getTime() + intervalMinutes * 60 * 1000
425+
);
426+
});
427+
428+
test('calculates retryAt by default timout when it exceeds the schedule of a recurring task', async () => {
429+
const intervalSeconds = 20;
430+
const id = _.random(1, 20).toString();
431+
const initialAttempts = _.random(0, 2);
432+
const { runner, store } = testOpts({
433+
instance: {
434+
id,
435+
attempts: initialAttempts,
436+
schedule: {
437+
interval: `${intervalSeconds}s`,
438+
},
439+
},
440+
definitions: {
441+
bar: {
442+
title: 'Bar!',
443+
createTaskRunner: () => ({
444+
run: async () => undefined,
445+
}),
446+
},
447+
},
448+
});
449+
450+
await runner.markTaskAsRunning();
451+
452+
sinon.assert.calledOnce(store.update);
453+
const instance = store.update.args[0][0];
454+
455+
expect(instance.retryAt.getTime()).toEqual(instance.startedAt.getTime() + 5 * 60 * 1000);
456+
});
457+
458+
test('calculates retryAt by timeout if it exceeds the schedule when running a recurring task', async () => {
459+
const timeoutMinutes = 1;
460+
const intervalSeconds = 20;
461+
const id = _.random(1, 20).toString();
462+
const initialAttempts = _.random(0, 2);
463+
const { runner, store } = testOpts({
464+
instance: {
465+
id,
466+
attempts: initialAttempts,
467+
schedule: {
468+
interval: `${intervalSeconds}s`,
469+
},
470+
},
471+
definitions: {
472+
bar: {
473+
title: 'Bar!',
474+
timeout: `${timeoutMinutes}m`,
475+
createTaskRunner: () => ({
476+
run: async () => undefined,
477+
}),
478+
},
479+
},
480+
});
481+
482+
await runner.markTaskAsRunning();
483+
484+
sinon.assert.calledOnce(store.update);
485+
const instance = store.update.args[0][0];
486+
487+
expect(instance.retryAt.getTime()).toEqual(
488+
instance.startedAt.getTime() + timeoutMinutes * 60 * 1000
489+
);
490+
});
491+
396492
test('uses getRetry function (returning date) on error when defined', async () => {
397493
const initialAttempts = _.random(1, 3);
398494
const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000);

x-pack/plugins/task_manager/server/task_running/task_runner.ts

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import {
2626
startTaskTimer,
2727
TaskTiming,
2828
} from '../task_events';
29-
import { intervalFromDate, intervalFromNow } from '../lib/intervals';
29+
import { intervalFromDate, maxIntervalFromDate } from '../lib/intervals';
3030
import {
3131
CancelFunction,
3232
CancellableTask,
@@ -259,15 +259,16 @@ export class TaskManagerRunner implements TaskRunner {
259259
status: TaskStatus.Running,
260260
startedAt: now,
261261
attempts,
262-
retryAt: this.instance.schedule
263-
? intervalFromNow(this.definition.timeout)!
264-
: this.getRetryDelay({
265-
attempts,
266-
// Fake an error. This allows retry logic when tasks keep timing out
267-
// and lets us set a proper "retryAt" value each time.
268-
error: new Error('Task timeout'),
269-
addDuration: this.definition.timeout,
270-
}) ?? null,
262+
retryAt:
263+
(this.instance.schedule
264+
? maxIntervalFromDate(now, this.instance.schedule!.interval, this.definition.timeout)
265+
: this.getRetryDelay({
266+
attempts,
267+
// Fake an error. This allows retry logic when tasks keep timing out
268+
// and lets us set a proper "retryAt" value each time.
269+
error: new Error('Task timeout'),
270+
addDuration: this.definition.timeout,
271+
})) ?? null,
271272
});
272273

273274
const timeUntilClaimExpiresAfterUpdate = howManyMsUntilOwnershipClaimExpires(

x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/plugin.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,17 @@ export class SampleTaskManagerFixturePlugin
115115
},
116116
}),
117117
},
118+
sampleRecurringTaskWhichHangs: {
119+
title: 'Sample Recurring Task that Hangs for a minute',
120+
description: 'A sample task that Hangs for a minute on each run.',
121+
maxAttempts: 3,
122+
timeout: '60s',
123+
createTaskRunner: () => ({
124+
async run() {
125+
return await new Promise((resolve) => {});
126+
},
127+
}),
128+
},
118129
sampleOneTimeTaskTimingOut: {
119130
title: 'Sample One-Time Task that Times Out',
120131
description: 'A sample task that times out each run.',

x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,28 @@ export default function ({ getService }: FtrProviderContext) {
260260
});
261261
});
262262

263+
it('should schedule the retry of recurring tasks to run at the next schedule when they time out', async () => {
264+
const intervalInMinutes = 30;
265+
const intervalInMilliseconds = intervalInMinutes * 60 * 1000;
266+
const task = await scheduleTask({
267+
taskType: 'sampleRecurringTaskWhichHangs',
268+
schedule: { interval: `${intervalInMinutes}m` },
269+
params: {},
270+
});
271+
272+
await retry.try(async () => {
273+
const [scheduledTask] = (await currentTasks()).docs;
274+
expect(scheduledTask.id).to.eql(task.id);
275+
const retryAt = Date.parse(scheduledTask.retryAt!);
276+
expect(isNaN(retryAt)).to.be(false);
277+
278+
const buffer = 10000; // 10 second buffer
279+
const retryDelay = retryAt - Date.parse(task.runAt);
280+
expect(retryDelay).to.be.greaterThan(intervalInMilliseconds - buffer);
281+
expect(retryDelay).to.be.lessThan(intervalInMilliseconds + buffer);
282+
});
283+
});
284+
263285
it('should reschedule if task returns runAt', async () => {
264286
const nextRunMilliseconds = _.random(60000, 200000);
265287
const count = _.random(1, 20);

0 commit comments

Comments
 (0)