Skip to content

Commit d9ad2d6

Browse files
authored
[Task manager] Prevents edge case where already running tasks are reschedule every polling interval (#74606) (#74941)
Fixes flaky tests in Task Manager and Alerting. The fix in #73244 was correct, but it missed an edge case which causes the already running task to be rescheduled over and over. This prevents that edge case which was effecting both TM in general and Alerting specifically.
1 parent 0c21024 commit d9ad2d6

File tree

3 files changed

+124
-38
lines changed

3 files changed

+124
-38
lines changed

x-pack/plugins/task_manager/server/task_store.test.ts

Lines changed: 100 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,7 @@ if (doc['task.runAt'].size()!=0) {
627627
});
628628
});
629629

630-
test('it returns task objects', async () => {
630+
test('it filters out running tasks', async () => {
631631
const taskManagerId = uuid.v1();
632632
const claimOwnershipUntil = new Date(Date.now());
633633
const runAt = new Date();
@@ -641,7 +641,7 @@ if (doc['task.runAt'].size()!=0) {
641641
taskType: 'foo',
642642
schedule: undefined,
643643
attempts: 0,
644-
status: 'idle',
644+
status: 'claiming',
645645
params: '{ "hello": "world" }',
646646
state: '{ "baby": "Henhen" }',
647647
user: 'jimbo',
@@ -715,7 +715,103 @@ if (doc['task.runAt'].size()!=0) {
715715
runAt,
716716
scope: ['reporting'],
717717
state: { baby: 'Henhen' },
718-
status: 'idle',
718+
status: 'claiming',
719+
taskType: 'foo',
720+
user: 'jimbo',
721+
ownerId: taskManagerId,
722+
},
723+
]);
724+
});
725+
726+
test('it returns task objects', async () => {
727+
const taskManagerId = uuid.v1();
728+
const claimOwnershipUntil = new Date(Date.now());
729+
const runAt = new Date();
730+
const tasks = [
731+
{
732+
_id: 'aaa',
733+
_source: {
734+
type: 'task',
735+
task: {
736+
runAt,
737+
taskType: 'foo',
738+
schedule: undefined,
739+
attempts: 0,
740+
status: 'claiming',
741+
params: '{ "hello": "world" }',
742+
state: '{ "baby": "Henhen" }',
743+
user: 'jimbo',
744+
scope: ['reporting'],
745+
ownerId: taskManagerId,
746+
},
747+
},
748+
_seq_no: 1,
749+
_primary_term: 2,
750+
sort: ['a', 1],
751+
},
752+
{
753+
_id: 'bbb',
754+
_source: {
755+
type: 'task',
756+
task: {
757+
runAt,
758+
taskType: 'bar',
759+
schedule: { interval: '5m' },
760+
attempts: 2,
761+
status: 'claiming',
762+
params: '{ "shazm": 1 }',
763+
state: '{ "henry": "The 8th" }',
764+
user: 'dabo',
765+
scope: ['reporting', 'ceo'],
766+
ownerId: taskManagerId,
767+
},
768+
},
769+
_seq_no: 3,
770+
_primary_term: 4,
771+
sort: ['b', 2],
772+
},
773+
];
774+
const {
775+
result: { docs },
776+
args: {
777+
search: {
778+
body: { query },
779+
},
780+
},
781+
} = await testClaimAvailableTasks({
782+
opts: {
783+
taskManagerId,
784+
},
785+
claimingOpts: {
786+
claimOwnershipUntil,
787+
size: 10,
788+
},
789+
hits: tasks,
790+
});
791+
792+
expect(query.bool.must).toContainEqual({
793+
bool: {
794+
must: [
795+
{
796+
term: {
797+
'task.ownerId': taskManagerId,
798+
},
799+
},
800+
{ term: { 'task.status': 'claiming' } },
801+
],
802+
},
803+
});
804+
805+
expect(docs).toMatchObject([
806+
{
807+
attempts: 0,
808+
id: 'aaa',
809+
schedule: undefined,
810+
params: { hello: 'world' },
811+
runAt,
812+
scope: ['reporting'],
813+
state: { baby: 'Henhen' },
814+
status: 'claiming',
719815
taskType: 'foo',
720816
user: 'jimbo',
721817
ownerId: taskManagerId,
@@ -728,7 +824,7 @@ if (doc['task.runAt'].size()!=0) {
728824
runAt,
729825
scope: ['reporting', 'ceo'],
730826
state: { henry: 'The 8th' },
731-
status: 'running',
827+
status: 'claiming',
732828
taskType: 'bar',
733829
user: 'dabo',
734830
ownerId: taskManagerId,

x-pack/plugins/task_manager/server/task_store.ts

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -216,48 +216,39 @@ export class TaskStore {
216216
claimTasksByIdWithRawIds,
217217
size
218218
);
219+
219220
const docs =
220221
numberOfTasksClaimed > 0
221222
? await this.sweepForClaimedTasks(claimTasksByIdWithRawIds, size)
222223
: [];
223224

224-
// emit success/fail events for claimed tasks by id
225-
if (claimTasksById && claimTasksById.length) {
226-
const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) =>
227-
claimTasksById.includes(doc.id)
228-
);
229-
230-
const [documentsClaimedById, documentsRequestedButNotClaimed] = partition(
231-
documentsReturnedById,
232-
// we filter the schduled tasks down by status is 'claiming' in the esearch,
233-
// but we do not apply this limitation on tasks claimed by ID so that we can
234-
// provide more detailed error messages when we fail to claim them
235-
(doc) => doc.status === TaskStatus.Claiming
236-
);
237-
238-
const documentsRequestedButNotReturned = difference(
239-
claimTasksById,
240-
map(documentsReturnedById, 'id')
241-
);
225+
const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) =>
226+
claimTasksById.includes(doc.id)
227+
);
242228

243-
this.emitEvents(
244-
[...documentsClaimedById, ...documentsClaimedBySchedule].map((doc) =>
245-
asTaskClaimEvent(doc.id, asOk(doc))
246-
)
247-
);
229+
const [documentsClaimedById, documentsRequestedButNotClaimed] = partition(
230+
documentsReturnedById,
231+
// we filter the schduled tasks down by status is 'claiming' in the esearch,
232+
// but we do not apply this limitation on tasks claimed by ID so that we can
233+
// provide more detailed error messages when we fail to claim them
234+
(doc) => doc.status === TaskStatus.Claiming
235+
);
248236

249-
this.emitEvents(
250-
documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc))))
251-
);
237+
const documentsRequestedButNotReturned = difference(
238+
claimTasksById,
239+
map(documentsReturnedById, 'id')
240+
);
252241

253-
this.emitEvents(
254-
documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none)))
255-
);
256-
}
242+
this.emitEvents([
243+
...documentsClaimedById.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))),
244+
...documentsClaimedBySchedule.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))),
245+
...documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc)))),
246+
...documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none))),
247+
]);
257248

258249
return {
259-
claimedTasks: numberOfTasksClaimed,
260-
docs,
250+
claimedTasks: documentsClaimedById.length + documentsClaimedBySchedule.length,
251+
docs: docs.filter((doc) => doc.status === TaskStatus.Claiming),
261252
};
262253
};
263254

x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ export default function ({ getService }) {
2828
const testHistoryIndex = '.kibana_task_manager_test_result';
2929
const supertest = supertestAsPromised(url.format(config.get('servers.kibana')));
3030

31-
// FLAKY: https://github.com/elastic/kibana/issues/71390
32-
describe.skip('scheduling and running tasks', () => {
31+
describe('scheduling and running tasks', () => {
3332
beforeEach(
3433
async () => await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200)
3534
);

0 commit comments

Comments
 (0)