Skip to content

Commit 5c770e5

Browse files
authored
[Task Manager] Correctly handle running tasks when calling RunNow and reduce flakiness in related tests (#73244)
This PR addresses two issues which caused several tests to be flaky in TM. When `runNow` was introduced to TM we added a pinned query which returned specific tasks by ID. This query does not have the filter applied to it which causes task to return when they're already marked as `running` but we didn't address these correctly which caused flakyness in the tests. This didn't cause a broken beahviour, but it did cause beahviour that was hard to reason about - we now address them correctly. It seems that sometimes, especially if the ES queue is overworked, it can take some time for the update to the underlying task to be visible (we don't user `refresh:true` on purpose), so adding a wait for the index to refresh to make sure the task is updated in time for the next stage of the test.
1 parent cf6413a commit 5c770e5

File tree

9 files changed

+316
-90
lines changed

9 files changed

+316
-90
lines changed

x-pack/plugins/alerts/server/alerts_client.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -387,11 +387,18 @@ export class AlertsClient {
387387
updateResult.scheduledTaskId &&
388388
!isEqual(alertSavedObject.attributes.schedule, updateResult.schedule)
389389
) {
390-
this.taskManager.runNow(updateResult.scheduledTaskId).catch((err: Error) => {
391-
this.logger.error(
392-
`Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}`
393-
);
394-
});
390+
this.taskManager
391+
.runNow(updateResult.scheduledTaskId)
392+
.then(() => {
393+
this.logger.debug(
394+
`Alert update has rescheduled the underlying task: ${updateResult.scheduledTaskId}`
395+
);
396+
})
397+
.catch((err: Error) => {
398+
this.logger.error(
399+
`Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}`
400+
);
401+
});
395402
}
396403
})(),
397404
]);

x-pack/plugins/task_manager/server/task_events.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66

7+
import { Option } from 'fp-ts/lib/Option';
8+
79
import { ConcreteTaskInstance } from './task';
810

911
import { Result, Err } from './lib/result_type';
@@ -22,7 +24,7 @@ export interface TaskEvent<T, E> {
2224
}
2325
export type TaskMarkRunning = TaskEvent<ConcreteTaskInstance, Error>;
2426
export type TaskRun = TaskEvent<ConcreteTaskInstance, Error>;
25-
export type TaskClaim = TaskEvent<ConcreteTaskInstance, Error>;
27+
export type TaskClaim = TaskEvent<ConcreteTaskInstance, Option<ConcreteTaskInstance>>;
2628
export type TaskRunRequest = TaskEvent<ConcreteTaskInstance, Error>;
2729

2830
export function asTaskMarkRunningEvent(
@@ -46,7 +48,7 @@ export function asTaskRunEvent(id: string, event: Result<ConcreteTaskInstance, E
4648

4749
export function asTaskClaimEvent(
4850
id: string,
49-
event: Result<ConcreteTaskInstance, Error>
51+
event: Result<ConcreteTaskInstance, Option<ConcreteTaskInstance>>
5052
): TaskClaim {
5153
return {
5254
id,

x-pack/plugins/task_manager/server/task_manager.test.ts

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import _ from 'lodash';
88
import sinon from 'sinon';
99
import { Subject } from 'rxjs';
10+
import { none } from 'fp-ts/lib/Option';
1011

1112
import {
1213
asTaskMarkRunningEvent,
@@ -297,7 +298,9 @@ describe('TaskManager', () => {
297298
events$.next(asTaskMarkRunningEvent(id, asOk(task)));
298299
events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong'))));
299300

300-
return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
301+
return expect(result).rejects.toMatchInlineSnapshot(
302+
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
303+
);
301304
});
302305

303306
test('rejects when the task mark as running fails', () => {
@@ -311,7 +314,9 @@ describe('TaskManager', () => {
311314
events$.next(asTaskClaimEvent(id, asOk(task)));
312315
events$.next(asTaskMarkRunningEvent(id, asErr(new Error('some thing gone wrong'))));
313316

314-
return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
317+
return expect(result).rejects.toMatchInlineSnapshot(
318+
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
319+
);
315320
});
316321

317322
test('when a task claim fails we ensure the task exists', async () => {
@@ -321,7 +326,7 @@ describe('TaskManager', () => {
321326

322327
const result = awaitTaskRunResult(id, events$, getLifecycle);
323328

324-
events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
329+
events$.next(asTaskClaimEvent(id, asErr(none)));
325330

326331
await expect(result).rejects.toEqual(
327332
new Error(`Failed to run task "${id}" as it does not exist`)
@@ -337,7 +342,7 @@ describe('TaskManager', () => {
337342

338343
const result = awaitTaskRunResult(id, events$, getLifecycle);
339344

340-
events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
345+
events$.next(asTaskClaimEvent(id, asErr(none)));
341346

342347
await expect(result).rejects.toEqual(
343348
new Error(`Failed to run task "${id}" as it is currently running`)
@@ -353,7 +358,7 @@ describe('TaskManager', () => {
353358

354359
const result = awaitTaskRunResult(id, events$, getLifecycle);
355360

356-
events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
361+
events$.next(asTaskClaimEvent(id, asErr(none)));
357362

358363
await expect(result).rejects.toEqual(
359364
new Error(`Failed to run task "${id}" as it is currently running`)
@@ -386,9 +391,11 @@ describe('TaskManager', () => {
386391

387392
const result = awaitTaskRunResult(id, events$, getLifecycle);
388393

389-
events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
394+
events$.next(asTaskClaimEvent(id, asErr(none)));
390395

391-
await expect(result).rejects.toEqual(new Error('failed to claim'));
396+
await expect(result).rejects.toMatchInlineSnapshot(
397+
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "idle")]`
398+
);
392399

393400
expect(getLifecycle).toHaveBeenCalledWith(id);
394401
});
@@ -400,9 +407,11 @@ describe('TaskManager', () => {
400407

401408
const result = awaitTaskRunResult(id, events$, getLifecycle);
402409

403-
events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
410+
events$.next(asTaskClaimEvent(id, asErr(none)));
404411

405-
await expect(result).rejects.toEqual(new Error('failed to claim'));
412+
await expect(result).rejects.toMatchInlineSnapshot(
413+
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "failed")]`
414+
);
406415

407416
expect(getLifecycle).toHaveBeenCalledWith(id);
408417
});
@@ -424,7 +433,9 @@ describe('TaskManager', () => {
424433

425434
events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong'))));
426435

427-
return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
436+
return expect(result).rejects.toMatchInlineSnapshot(
437+
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
438+
);
428439
});
429440
});
430441
});

x-pack/plugins/task_manager/server/task_manager.ts

Lines changed: 62 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ import { filter } from 'rxjs/operators';
99
import { performance } from 'perf_hooks';
1010

1111
import { pipe } from 'fp-ts/lib/pipeable';
12-
import { Option, some, map as mapOptional } from 'fp-ts/lib/Option';
12+
import { Option, some, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';
13+
1314
import {
1415
SavedObjectsSerializer,
1516
ILegacyScopedClusterClient,
1617
ISavedObjectsRepository,
1718
} from '../../../../src/core/server';
18-
import { Result, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
19+
import { Result, asOk, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
1920
import { TaskManagerConfig } from './config';
2021

2122
import { Logger } from './types';
@@ -405,7 +406,9 @@ export async function claimAvailableTasks(
405406

406407
if (docs.length !== claimedTasks) {
407408
logger.warn(
408-
`[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched`
409+
`[Task Ownership error]: ${claimedTasks} tasks were claimed by Kibana, but ${
410+
docs.length
411+
} task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})`
409412
);
410413
}
411414
return docs;
@@ -437,48 +440,65 @@ export async function awaitTaskRunResult(
437440
// listen for all events related to the current task
438441
.pipe(filter(({ id }: TaskLifecycleEvent) => id === taskId))
439442
.subscribe((taskEvent: TaskLifecycleEvent) => {
440-
either(
441-
taskEvent.event,
442-
(taskInstance: ConcreteTaskInstance) => {
443-
// resolve if the task has run sucessfully
444-
if (isTaskRunEvent(taskEvent)) {
445-
subscription.unsubscribe();
446-
resolve({ id: taskInstance.id });
447-
}
448-
},
449-
async (error: Error) => {
443+
if (isTaskClaimEvent(taskEvent)) {
444+
mapErr(async (error: Option<ConcreteTaskInstance>) => {
450445
// reject if any error event takes place for the requested task
451446
subscription.unsubscribe();
452-
if (isTaskRunRequestEvent(taskEvent)) {
453-
return reject(
454-
new Error(
455-
`Failed to run task "${taskId}" as Task Manager is at capacity, please try again later`
456-
)
457-
);
458-
} else if (isTaskClaimEvent(taskEvent)) {
459-
reject(
460-
map(
461-
// if the error happened in the Claim phase - we try to provide better insight
462-
// into why we failed to claim by getting the task's current lifecycle status
463-
await promiseResult<TaskLifecycle, Error>(getLifecycle(taskId)),
464-
(taskLifecycleStatus: TaskLifecycle) => {
465-
if (taskLifecycleStatus === TaskLifecycleResult.NotFound) {
466-
return new Error(`Failed to run task "${taskId}" as it does not exist`);
467-
} else if (
468-
taskLifecycleStatus === TaskStatus.Running ||
469-
taskLifecycleStatus === TaskStatus.Claiming
470-
) {
471-
return new Error(`Failed to run task "${taskId}" as it is currently running`);
472-
}
473-
return error;
474-
},
475-
() => error
476-
)
477-
);
447+
return reject(
448+
map(
449+
await pipe(
450+
error,
451+
mapOptional(async (taskReturnedBySweep) => asOk(taskReturnedBySweep.status)),
452+
getOrElse(() =>
453+
// if the error happened in the Claim phase - we try to provide better insight
454+
// into why we failed to claim by getting the task's current lifecycle status
455+
promiseResult<TaskLifecycle, Error>(getLifecycle(taskId))
456+
)
457+
),
458+
(taskLifecycleStatus: TaskLifecycle) => {
459+
if (taskLifecycleStatus === TaskLifecycleResult.NotFound) {
460+
return new Error(`Failed to run task "${taskId}" as it does not exist`);
461+
} else if (
462+
taskLifecycleStatus === TaskStatus.Running ||
463+
taskLifecycleStatus === TaskStatus.Claiming
464+
) {
465+
return new Error(`Failed to run task "${taskId}" as it is currently running`);
466+
}
467+
return new Error(
468+
`Failed to run task "${taskId}" for unknown reason (Current Task Lifecycle is "${taskLifecycleStatus}")`
469+
);
470+
},
471+
(getLifecycleError: Error) =>
472+
new Error(
473+
`Failed to run task "${taskId}" and failed to get current Status:${getLifecycleError}`
474+
)
475+
)
476+
);
477+
}, taskEvent.event);
478+
} else {
479+
either<ConcreteTaskInstance, Error | Option<ConcreteTaskInstance>>(
480+
taskEvent.event,
481+
(taskInstance: ConcreteTaskInstance) => {
482+
// resolve if the task has run sucessfully
483+
if (isTaskRunEvent(taskEvent)) {
484+
subscription.unsubscribe();
485+
resolve({ id: taskInstance.id });
486+
}
487+
},
488+
async (error: Error | Option<ConcreteTaskInstance>) => {
489+
// reject if any error event takes place for the requested task
490+
subscription.unsubscribe();
491+
if (isTaskRunRequestEvent(taskEvent)) {
492+
return reject(
493+
new Error(
494+
`Failed to run task "${taskId}" as Task Manager is at capacity, please try again later`
495+
)
496+
);
497+
}
498+
return reject(new Error(`Failed to run task "${taskId}": ${error}`));
478499
}
479-
return reject(error);
480-
}
481-
);
500+
);
501+
}
482502
});
483503
});
484504
}

0 commit comments

Comments
 (0)