Skip to content

Commit 52747c9

Browse files
Only fetching TaskManager's available tasks once per call to fillPool (#61991)
Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
1 parent 7ea3f12 commit 52747c9

File tree

2 files changed

+24
-29
lines changed

2 files changed

+24
-29
lines changed

x-pack/plugins/task_manager/server/lib/fill_pool.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { fillPool } from './fill_pool';
1010
import { TaskPoolRunResult } from '../task_pool';
1111

1212
describe('fillPool', () => {
13-
test('stops filling when there are no more tasks in the store', async () => {
13+
test('stops filling when pool runs all claimed tasks, even if there is more capacity', async () => {
1414
const tasks = [
1515
[1, 2, 3],
1616
[4, 5],
@@ -22,7 +22,7 @@ describe('fillPool', () => {
2222

2323
await fillPool(fetchAvailableTasks, converter, run);
2424

25-
expect(_.flattenDeep(run.args)).toEqual([1, 2, 3, 4, 5]);
25+
expect(_.flattenDeep(run.args)).toEqual([1, 2, 3]);
2626
});
2727

2828
test('stops filling when the pool has no more capacity', async () => {

x-pack/plugins/task_manager/server/lib/fill_pool.ts

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
*/
66

77
import { performance } from 'perf_hooks';
8-
import { after } from 'lodash';
98
import { TaskPoolRunResult } from '../task_pool';
109

1110
export enum FillPoolResult {
1211
NoTasksClaimed = 'NoTasksClaimed',
1312
RanOutOfCapacity = 'RanOutOfCapacity',
13+
PoolFilled = 'PoolFilled',
1414
}
1515

1616
type BatchRun<T> = (tasks: T[]) => Promise<TaskPoolRunResult>;
@@ -35,33 +35,28 @@ export async function fillPool<TRecord, TRunner>(
3535
run: BatchRun<TRunner>
3636
): Promise<FillPoolResult> {
3737
performance.mark('fillPool.start');
38-
const markClaimedTasksOnRerunCycle = after(2, () =>
39-
performance.mark('fillPool.claimedOnRerunCycle')
40-
);
41-
while (true) {
42-
const instances = await fetchAvailableTasks();
38+
const instances = await fetchAvailableTasks();
4339

44-
if (!instances.length) {
45-
performance.mark('fillPool.bailNoTasks');
46-
performance.measure(
47-
'fillPool.activityDurationUntilNoTasks',
48-
'fillPool.start',
49-
'fillPool.bailNoTasks'
50-
);
51-
return FillPoolResult.NoTasksClaimed;
52-
}
53-
markClaimedTasksOnRerunCycle();
54-
const tasks = instances.map(converter);
40+
if (!instances.length) {
41+
performance.mark('fillPool.bailNoTasks');
42+
performance.measure(
43+
'fillPool.activityDurationUntilNoTasks',
44+
'fillPool.start',
45+
'fillPool.bailNoTasks'
46+
);
47+
return FillPoolResult.NoTasksClaimed;
48+
}
49+
const tasks = instances.map(converter);
5550

56-
if ((await run(tasks)) === TaskPoolRunResult.RanOutOfCapacity) {
57-
performance.mark('fillPool.bailExhaustedCapacity');
58-
performance.measure(
59-
'fillPool.activityDurationUntilExhaustedCapacity',
60-
'fillPool.start',
61-
'fillPool.bailExhaustedCapacity'
62-
);
63-
return FillPoolResult.RanOutOfCapacity;
64-
}
65-
performance.mark('fillPool.cycle');
51+
if ((await run(tasks)) === TaskPoolRunResult.RanOutOfCapacity) {
52+
performance.mark('fillPool.bailExhaustedCapacity');
53+
performance.measure(
54+
'fillPool.activityDurationUntilExhaustedCapacity',
55+
'fillPool.start',
56+
'fillPool.bailExhaustedCapacity'
57+
);
58+
return FillPoolResult.RanOutOfCapacity;
6659
}
60+
performance.mark('fillPool.cycle');
61+
return FillPoolResult.PoolFilled;
6762
}

0 commit comments

Comments
 (0)