Skip to content

Commit d46b604

Browse files
authored
Reactively disable Task Manager lifecycle when core services become unavailable (#81779) (#81991)
Plugs the Task Manager polling lifecycle into the Kibana Services Status streams in order to ensure we reactively start and stop polling whenever the Elasticsearch or SavedObjects service switch between `available` and `unavailable`. This will prevent Task Manager from polling whenever these services switch to an `unavailable` state.
1 parent 233a3e3 commit d46b604

File tree

9 files changed

+301
-74
lines changed

9 files changed

+301
-74
lines changed

x-pack/plugins/task_manager/server/monitoring/index.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,20 @@ export {
2828
export function createMonitoringStats(
2929
taskPollingLifecycle: TaskPollingLifecycle,
3030
taskStore: TaskStore,
31+
elasticsearchAndSOAvailability$: Observable<boolean>,
3132
config: TaskManagerConfig,
3233
managedConfig: ManagedConfiguration,
3334
logger: Logger
3435
): Observable<MonitoringStats> {
3536
return createMonitoringStatsStream(
36-
createAggregators(taskPollingLifecycle, taskStore, config, managedConfig, logger),
37+
createAggregators(
38+
taskPollingLifecycle,
39+
taskStore,
40+
elasticsearchAndSOAvailability$,
41+
config,
42+
managedConfig,
43+
logger
44+
),
3745
config
3846
);
3947
}

x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ export interface RawMonitoringStats {
6363
export function createAggregators(
6464
taskPollingLifecycle: TaskPollingLifecycle,
6565
taskStore: TaskStore,
66+
elasticsearchAndSOAvailability$: Observable<boolean>,
6667
config: TaskManagerConfig,
6768
managedConfig: ManagedConfiguration,
6869
logger: Logger
@@ -72,6 +73,7 @@ export function createAggregators(
7273
createTaskRunAggregator(taskPollingLifecycle, config.monitored_stats_running_average_window),
7374
createWorkloadAggregator(
7475
taskStore,
76+
elasticsearchAndSOAvailability$,
7577
config.monitored_aggregated_stats_refresh_rate,
7678
config.poll_interval,
7779
logger

x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import { ESSearchResponse } from '../../../apm/typings/elasticsearch';
1717
import { AggregationResultOf } from '../../../apm/typings/elasticsearch/aggregations';
1818
import { times } from 'lodash';
1919
import { taskStoreMock } from '../task_store.mock';
20+
import { of, Subject } from 'rxjs';
21+
import { sleep } from '../test_utils';
2022

2123
type MockESResult = ESSearchResponse<
2224
ConcreteTaskInstance,
@@ -75,6 +77,7 @@ describe('Workload Statistics Aggregator', () => {
7577

7678
const workloadAggregator = createWorkloadAggregator(
7779
taskStore,
80+
of(true),
7881
10,
7982
3000,
8083
loggingSystemMock.create().get()
@@ -231,6 +234,7 @@ describe('Workload Statistics Aggregator', () => {
231234

232235
const workloadAggregator = createWorkloadAggregator(
233236
taskStore,
237+
of(true),
234238
10,
235239
3000,
236240
loggingSystemMock.create().get()
@@ -252,12 +256,51 @@ describe('Workload Statistics Aggregator', () => {
252256
});
253257
});
254258

259+
test('skips summary of the workload when services are unavailable', async () => {
260+
const taskStore = taskStoreMock.create({});
261+
taskStore.aggregate.mockResolvedValue(mockAggregatedResult());
262+
263+
const availability$ = new Subject<boolean>();
264+
265+
const workloadAggregator = createWorkloadAggregator(
266+
taskStore,
267+
availability$,
268+
10,
269+
3000,
270+
loggingSystemMock.create().get()
271+
);
272+
273+
return new Promise(async (resolve) => {
274+
workloadAggregator.pipe(first()).subscribe((result) => {
275+
expect(result.key).toEqual('workload');
276+
expect(result.value).toMatchObject({
277+
count: 4,
278+
task_types: {
279+
actions_telemetry: { count: 2, status: { idle: 2 } },
280+
alerting_telemetry: { count: 1, status: { idle: 1 } },
281+
session_cleanup: { count: 1, status: { idle: 1 } },
282+
},
283+
});
284+
resolve();
285+
});
286+
287+
availability$.next(false);
288+
289+
await sleep(10);
290+
expect(taskStore.aggregate).not.toHaveBeenCalled();
291+
await sleep(10);
292+
expect(taskStore.aggregate).not.toHaveBeenCalled();
293+
availability$.next(true);
294+
});
295+
});
296+
255297
test('returns a count of the overdue workload', async () => {
256298
const taskStore = taskStoreMock.create({});
257299
taskStore.aggregate.mockResolvedValue(mockAggregatedResult());
258300

259301
const workloadAggregator = createWorkloadAggregator(
260302
taskStore,
303+
of(true),
261304
10,
262305
3000,
263306
loggingSystemMock.create().get()
@@ -280,6 +323,7 @@ describe('Workload Statistics Aggregator', () => {
280323

281324
const workloadAggregator = createWorkloadAggregator(
282325
taskStore,
326+
of(true),
283327
10,
284328
3000,
285329
loggingSystemMock.create().get()
@@ -307,6 +351,7 @@ describe('Workload Statistics Aggregator', () => {
307351

308352
const workloadAggregator = createWorkloadAggregator(
309353
taskStore,
354+
of(true),
310355
60 * 1000,
311356
3000,
312357
loggingSystemMock.create().get()
@@ -344,6 +389,7 @@ describe('Workload Statistics Aggregator', () => {
344389

345390
const workloadAggregator = createWorkloadAggregator(
346391
taskStore,
392+
of(true),
347393
15 * 60 * 1000,
348394
3000,
349395
loggingSystemMock.create().get()
@@ -392,7 +438,7 @@ describe('Workload Statistics Aggregator', () => {
392438
})
393439
);
394440
const logger = loggingSystemMock.create().get();
395-
const workloadAggregator = createWorkloadAggregator(taskStore, 10, 3000, logger);
441+
const workloadAggregator = createWorkloadAggregator(taskStore, of(true), 10, 3000, logger);
396442

397443
return new Promise((resolve, reject) => {
398444
workloadAggregator.pipe(take(2), bufferCount(2)).subscribe((results) => {

x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66

7-
import { timer } from 'rxjs';
8-
import { mergeMap, map, catchError } from 'rxjs/operators';
7+
import { combineLatest, Observable, timer } from 'rxjs';
8+
import { mergeMap, map, filter, catchError } from 'rxjs/operators';
99
import { Logger } from 'src/core/server';
1010
import { JsonObject } from 'src/plugins/kibana_utils/common';
1111
import { keyBy, mapValues } from 'lodash';
@@ -94,6 +94,7 @@ const MAX_SHCEDULE_DENSITY_BUCKETS = 50;
9494

9595
export function createWorkloadAggregator(
9696
taskStore: TaskStore,
97+
elasticsearchAndSOAvailability$: Observable<boolean>,
9798
refreshInterval: number,
9899
pollInterval: number,
99100
logger: Logger
@@ -105,7 +106,8 @@ export function createWorkloadAggregator(
105106
MAX_SHCEDULE_DENSITY_BUCKETS
106107
);
107108

108-
return timer(0, refreshInterval).pipe(
109+
return combineLatest([timer(0, refreshInterval), elasticsearchAndSOAvailability$]).pipe(
110+
filter(([, areElasticsearchAndSOAvailable]) => areElasticsearchAndSOAvailable),
109111
mergeMap(() =>
110112
taskStore.aggregate({
111113
aggs: {

x-pack/plugins/task_manager/server/plugin.test.ts

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66

7-
import { TaskManagerPlugin } from './plugin';
7+
import { TaskManagerPlugin, getElasticsearchAndSOAvailability } from './plugin';
88
import { coreMock } from '../../../../src/core/server/mocks';
99
import { TaskManagerConfig } from './config';
10+
import { Subject } from 'rxjs';
11+
import { bufferCount, take } from 'rxjs/operators';
12+
import { CoreStatus, ServiceStatusLevels } from 'src/core/server';
1013

1114
describe('TaskManagerPlugin', () => {
1215
describe('setup', () => {
@@ -88,4 +91,99 @@ describe('TaskManagerPlugin', () => {
8891
);
8992
});
9093
});
94+
95+
describe('getElasticsearchAndSOAvailability', () => {
96+
test('returns true when both services are available', async () => {
97+
const core$ = new Subject<CoreStatus>();
98+
99+
const availability = getElasticsearchAndSOAvailability(core$)
100+
.pipe(take(1), bufferCount(1))
101+
.toPromise();
102+
103+
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));
104+
105+
expect(await availability).toEqual([true]);
106+
});
107+
108+
test('returns false when both services are unavailable', async () => {
109+
const core$ = new Subject<CoreStatus>();
110+
111+
const availability = getElasticsearchAndSOAvailability(core$)
112+
.pipe(take(1), bufferCount(1))
113+
.toPromise();
114+
115+
core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false }));
116+
117+
expect(await availability).toEqual([false]);
118+
});
119+
120+
test('returns false when one service is unavailable but the other is available', async () => {
121+
const core$ = new Subject<CoreStatus>();
122+
123+
const availability = getElasticsearchAndSOAvailability(core$)
124+
.pipe(take(1), bufferCount(1))
125+
.toPromise();
126+
127+
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false }));
128+
129+
expect(await availability).toEqual([false]);
130+
});
131+
132+
test('shift back and forth between values as status changes', async () => {
133+
const core$ = new Subject<CoreStatus>();
134+
135+
const availability = getElasticsearchAndSOAvailability(core$)
136+
.pipe(take(3), bufferCount(3))
137+
.toPromise();
138+
139+
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false }));
140+
141+
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));
142+
143+
core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false }));
144+
145+
expect(await availability).toEqual([false, true, false]);
146+
});
147+
148+
test(`skips values when the status hasn't changed`, async () => {
149+
const core$ = new Subject<CoreStatus>();
150+
151+
const availability = getElasticsearchAndSOAvailability(core$)
152+
.pipe(take(3), bufferCount(3))
153+
.toPromise();
154+
155+
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false }));
156+
157+
// still false, so shouldn't emit a second time
158+
core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: true }));
159+
160+
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));
161+
162+
// shouldn't emit as already true
163+
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));
164+
165+
core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false }));
166+
167+
expect(await availability).toEqual([false, true, false]);
168+
});
169+
});
91170
});
171+
172+
function mockCoreStatusAvailability({
173+
elasticsearch,
174+
savedObjects,
175+
}: {
176+
elasticsearch: boolean;
177+
savedObjects: boolean;
178+
}) {
179+
return {
180+
elasticsearch: {
181+
level: elasticsearch ? ServiceStatusLevels.available : ServiceStatusLevels.unavailable,
182+
summary: '',
183+
},
184+
savedObjects: {
185+
level: savedObjects ? ServiceStatusLevels.available : ServiceStatusLevels.unavailable,
186+
summary: '',
187+
},
188+
};
189+
}

x-pack/plugins/task_manager/server/plugin.ts

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,17 @@
33
* or more contributor license agreements. Licensed under the Elastic License;
44
* you may not use this file except in compliance with the Elastic License.
55
*/
6-
import { PluginInitializerContext, Plugin, CoreSetup, Logger, CoreStart } from 'src/core/server';
7-
import { combineLatest, Subject } from 'rxjs';
8-
import { first, map } from 'rxjs/operators';
6+
import { combineLatest, Observable, Subject } from 'rxjs';
7+
import { first, map, distinctUntilChanged } from 'rxjs/operators';
8+
import {
9+
PluginInitializerContext,
10+
Plugin,
11+
CoreSetup,
12+
Logger,
13+
CoreStart,
14+
ServiceStatusLevels,
15+
CoreStatus,
16+
} from '../../../../src/core/server';
917
import { TaskDefinition } from './task';
1018
import { TaskPollingLifecycle } from './polling_lifecycle';
1119
import { TaskManagerConfig } from './config';
@@ -37,6 +45,7 @@ export class TaskManagerPlugin
3745
private logger: Logger;
3846
private definitions: TaskTypeDictionary;
3947
private middleware: Middleware = createInitialMiddleware();
48+
private elasticsearchAndSOAvailability$?: Observable<boolean>;
4049
private monitoringStats$ = new Subject<MonitoringStats>();
4150

4251
constructor(private readonly initContext: PluginInitializerContext) {
@@ -51,6 +60,8 @@ export class TaskManagerPlugin
5160
.pipe(first())
5261
.toPromise();
5362

63+
this.elasticsearchAndSOAvailability$ = getElasticsearchAndSOAvailability(core.status.core$);
64+
5465
setupSavedObjects(core.savedObjects, this.config);
5566
this.taskManagerId = this.initContext.env.instanceUuid;
5667

@@ -115,19 +126,20 @@ export class TaskManagerPlugin
115126
startingPollInterval: this.config!.poll_interval,
116127
});
117128

118-
const taskPollingLifecycle = new TaskPollingLifecycle({
129+
this.taskPollingLifecycle = new TaskPollingLifecycle({
119130
config: this.config!,
120131
definitions: this.definitions,
121132
logger: this.logger,
122133
taskStore,
123134
middleware: this.middleware,
135+
elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!,
124136
...managedConfiguration,
125137
});
126-
this.taskPollingLifecycle = taskPollingLifecycle;
127138

128139
createMonitoringStats(
129-
taskPollingLifecycle,
140+
this.taskPollingLifecycle,
130141
taskStore,
142+
this.elasticsearchAndSOAvailability$!,
131143
this.config!,
132144
managedConfiguration,
133145
this.logger
@@ -137,12 +149,9 @@ export class TaskManagerPlugin
137149
logger: this.logger,
138150
taskStore,
139151
middleware: this.middleware,
140-
taskPollingLifecycle,
152+
taskPollingLifecycle: this.taskPollingLifecycle,
141153
});
142154

143-
// start polling for work
144-
taskPollingLifecycle.start();
145-
146155
return {
147156
fetch: (opts: SearchOpts): Promise<FetchResult> => taskStore.fetch(opts),
148157
get: (id: string) => taskStore.get(id),
@@ -153,12 +162,6 @@ export class TaskManagerPlugin
153162
};
154163
}
155164

156-
public stop() {
157-
if (this.taskPollingLifecycle) {
158-
this.taskPollingLifecycle.stop();
159-
}
160-
}
161-
162165
/**
163166
* Ensures task manager hasn't started
164167
*
@@ -171,3 +174,16 @@ export class TaskManagerPlugin
171174
}
172175
}
173176
}
177+
178+
export function getElasticsearchAndSOAvailability(
179+
core$: Observable<CoreStatus>
180+
): Observable<boolean> {
181+
return core$.pipe(
182+
map(
183+
({ elasticsearch, savedObjects }) =>
184+
elasticsearch.level === ServiceStatusLevels.available &&
185+
savedObjects.level === ServiceStatusLevels.available
186+
),
187+
distinctUntilChanged()
188+
);
189+
}

0 commit comments

Comments
 (0)