Skip to content

Commit 56ba753

Browse files
Don't start pollEsNodesVersion unless someone subscribes (#56923)
* Don't start pollEsNodesVersion unless someone subscribes By not polling until subscribed to, we prevent verbose error logs when the optimizer is run (which automatically skips migrations). * Test pollEsNodeVersions behaviour * Cleanup unused code * PR Feedback * Make test more stable Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
1 parent 9bdd23a commit 56ba753

File tree

3 files changed

+127
-63
lines changed

3 files changed

+127
-63
lines changed

src/core/server/elasticsearch/elasticsearch_service.test.ts

Lines changed: 99 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ import { ElasticsearchService } from './elasticsearch_service';
3333
import { elasticsearchServiceMock } from './elasticsearch_service.mock';
3434
import { duration } from 'moment';
3535

36+
const delay = async (durationMs: number) =>
37+
await new Promise(resolve => setTimeout(resolve, durationMs));
38+
3639
let elasticsearchService: ElasticsearchService;
3740
const configService = configServiceMock.create();
3841
const deps = {
@@ -42,7 +45,7 @@ configService.atPath.mockReturnValue(
4245
new BehaviorSubject({
4346
hosts: ['http://1.2.3.4'],
4447
healthCheck: {
45-
delay: duration(2000),
48+
delay: duration(10),
4649
},
4750
ssl: {
4851
verificationMode: 'none',
@@ -125,21 +128,21 @@ describe('#setup', () => {
125128

126129
const config = MockClusterClient.mock.calls[0][0];
127130
expect(config).toMatchInlineSnapshot(`
128-
Object {
129-
"healthCheckDelay": "PT2S",
130-
"hosts": Array [
131-
"http://8.8.8.8",
132-
],
133-
"logQueries": true,
134-
"requestHeadersWhitelist": Array [
135-
undefined,
136-
],
137-
"ssl": Object {
138-
"certificate": "certificate-value",
139-
"verificationMode": "none",
140-
},
141-
}
142-
`);
131+
Object {
132+
"healthCheckDelay": "PT0.01S",
133+
"hosts": Array [
134+
"http://8.8.8.8",
135+
],
136+
"logQueries": true,
137+
"requestHeadersWhitelist": Array [
138+
undefined,
139+
],
140+
"ssl": Object {
141+
"certificate": "certificate-value",
142+
"verificationMode": "none",
143+
},
144+
}
145+
`);
143146
});
144147
it('falls back to elasticsearch config if custom config not passed', async () => {
145148
const setupContract = await elasticsearchService.setup(deps);
@@ -150,24 +153,24 @@ Object {
150153

151154
const config = MockClusterClient.mock.calls[0][0];
152155
expect(config).toMatchInlineSnapshot(`
153-
Object {
154-
"healthCheckDelay": "PT2S",
155-
"hosts": Array [
156-
"http://1.2.3.4",
157-
],
158-
"requestHeadersWhitelist": Array [
159-
undefined,
160-
],
161-
"ssl": Object {
162-
"alwaysPresentCertificate": undefined,
163-
"certificate": undefined,
164-
"certificateAuthorities": undefined,
165-
"key": undefined,
166-
"keyPassphrase": undefined,
167-
"verificationMode": "none",
168-
},
169-
}
170-
`);
156+
Object {
157+
"healthCheckDelay": "PT0.01S",
158+
"hosts": Array [
159+
"http://1.2.3.4",
160+
],
161+
"requestHeadersWhitelist": Array [
162+
undefined,
163+
],
164+
"ssl": Object {
165+
"alwaysPresentCertificate": undefined,
166+
"certificate": undefined,
167+
"certificateAuthorities": undefined,
168+
"key": undefined,
169+
"keyPassphrase": undefined,
170+
"verificationMode": "none",
171+
},
172+
}
173+
`);
171174
});
172175

173176
it('does not merge elasticsearch hosts if custom config overrides', async () => {
@@ -213,6 +216,45 @@ Object {
213216
`);
214217
});
215218
});
219+
220+
it('esNodeVersionCompatibility$ only starts polling when subscribed to', async done => {
221+
const mockAdminClusterClientInstance = elasticsearchServiceMock.createClusterClient();
222+
const mockDataClusterClientInstance = elasticsearchServiceMock.createClusterClient();
223+
MockClusterClient.mockImplementationOnce(
224+
() => mockAdminClusterClientInstance
225+
).mockImplementationOnce(() => mockDataClusterClientInstance);
226+
227+
mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());
228+
229+
const setupContract = await elasticsearchService.setup(deps);
230+
await delay(10);
231+
232+
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0);
233+
setupContract.esNodesCompatibility$.subscribe(() => {
234+
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
235+
done();
236+
});
237+
});
238+
239+
it('esNodeVersionCompatibility$ stops polling when unsubscribed from', async done => {
240+
const mockAdminClusterClientInstance = elasticsearchServiceMock.createClusterClient();
241+
const mockDataClusterClientInstance = elasticsearchServiceMock.createClusterClient();
242+
MockClusterClient.mockImplementationOnce(
243+
() => mockAdminClusterClientInstance
244+
).mockImplementationOnce(() => mockDataClusterClientInstance);
245+
246+
mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());
247+
248+
const setupContract = await elasticsearchService.setup(deps);
249+
250+
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0);
251+
const sub = setupContract.esNodesCompatibility$.subscribe(async () => {
252+
sub.unsubscribe();
253+
await delay(100);
254+
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
255+
done();
256+
});
257+
});
216258
});
217259

218260
describe('#stop', () => {
@@ -229,4 +271,27 @@ describe('#stop', () => {
229271
expect(mockAdminClusterClientInstance.close).toHaveBeenCalledTimes(1);
230272
expect(mockDataClusterClientInstance.close).toHaveBeenCalledTimes(1);
231273
});
274+
275+
it('stops pollEsNodeVersions even if there are active subscriptions', async done => {
276+
expect.assertions(2);
277+
const mockAdminClusterClientInstance = elasticsearchServiceMock.createCustomClusterClient();
278+
const mockDataClusterClientInstance = elasticsearchServiceMock.createCustomClusterClient();
279+
280+
MockClusterClient.mockImplementationOnce(
281+
() => mockAdminClusterClientInstance
282+
).mockImplementationOnce(() => mockDataClusterClientInstance);
283+
284+
mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error());
285+
286+
const setupContract = await elasticsearchService.setup(deps);
287+
288+
setupContract.esNodesCompatibility$.subscribe(async () => {
289+
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
290+
291+
await elasticsearchService.stop();
292+
await delay(100);
293+
expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1);
294+
done();
295+
});
296+
});
232297
});

src/core/server/elasticsearch/elasticsearch_service.ts

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,17 @@
1717
* under the License.
1818
*/
1919

20-
import { ConnectableObservable, Observable, Subscription } from 'rxjs';
21-
import { filter, first, map, publishReplay, switchMap, take } from 'rxjs/operators';
20+
import { ConnectableObservable, Observable, Subscription, Subject } from 'rxjs';
21+
import {
22+
filter,
23+
first,
24+
map,
25+
publishReplay,
26+
switchMap,
27+
take,
28+
shareReplay,
29+
takeUntil,
30+
} from 'rxjs/operators';
2231

2332
import { CoreService } from '../../types';
2433
import { merge } from '../../utils';
@@ -47,13 +56,8 @@ interface SetupDeps {
4756
export class ElasticsearchService implements CoreService<InternalElasticsearchServiceSetup> {
4857
private readonly log: Logger;
4958
private readonly config$: Observable<ElasticsearchConfig>;
50-
private subscriptions: {
51-
client?: Subscription;
52-
esNodesCompatibility?: Subscription;
53-
} = {
54-
client: undefined,
55-
esNodesCompatibility: undefined,
56-
};
59+
private subscription: Subscription | undefined;
60+
private stop$ = new Subject();
5761
private kibanaVersion: string;
5862

5963
constructor(private readonly coreContext: CoreContext) {
@@ -69,7 +73,7 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe
6973

7074
const clients$ = this.config$.pipe(
7175
filter(() => {
72-
if (this.subscriptions.client !== undefined) {
76+
if (this.subscription !== undefined) {
7377
this.log.error('Clients cannot be changed after they are created');
7478
return false;
7579
}
@@ -100,7 +104,7 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe
100104
publishReplay(1)
101105
) as ConnectableObservable<CoreClusterClients>;
102106

103-
this.subscriptions.client = clients$.connect();
107+
this.subscription = clients$.connect();
104108

105109
const config = await this.config$.pipe(first()).toPromise();
106110

@@ -164,18 +168,7 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe
164168
ignoreVersionMismatch: config.ignoreVersionMismatch,
165169
esVersionCheckInterval: config.healthCheckDelay.asMilliseconds(),
166170
kibanaVersion: this.kibanaVersion,
167-
}).pipe(publishReplay(1));
168-
169-
this.subscriptions.esNodesCompatibility = (esNodesCompatibility$ as ConnectableObservable<
170-
unknown
171-
>).connect();
172-
173-
// TODO: Move to Status Service https://github.com/elastic/kibana/issues/41983
174-
esNodesCompatibility$.subscribe(({ isCompatible, message }) => {
175-
if (!isCompatible && message) {
176-
this.log.error(message);
177-
}
178-
});
171+
}).pipe(takeUntil(this.stop$), shareReplay({ refCount: true, bufferSize: 1 }));
179172

180173
return {
181174
legacy: { config$: clients$.pipe(map(clients => clients.config)) },
@@ -195,12 +188,10 @@ export class ElasticsearchService implements CoreService<InternalElasticsearchSe
195188

196189
public async stop() {
197190
this.log.debug('Stopping elasticsearch service');
198-
// TODO(TS-3.7-ESLINT)
199-
// eslint-disable-next-line no-unused-expressions
200-
this.subscriptions.client?.unsubscribe();
201-
// eslint-disable-next-line no-unused-expressions
202-
this.subscriptions.esNodesCompatibility?.unsubscribe();
203-
this.subscriptions = { client: undefined, esNodesCompatibility: undefined };
191+
if (this.subscription !== undefined) {
192+
this.subscription.unsubscribe();
193+
}
194+
this.stop$.next();
204195
}
205196

206197
private createClusterClient(

src/core/server/saved_objects/saved_objects_service.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,14 @@ export class SavedObjectsService
387387
this.logger.info(
388388
'Waiting until all Elasticsearch nodes are compatible with Kibana before starting saved objects migrations...'
389389
);
390+
391+
// TODO: Move to Status Service https://github.com/elastic/kibana/issues/41983
392+
this.setupDeps!.elasticsearch.esNodesCompatibility$.subscribe(({ isCompatible, message }) => {
393+
if (!isCompatible && message) {
394+
this.logger.error(message);
395+
}
396+
});
397+
390398
await this.setupDeps!.elasticsearch.esNodesCompatibility$.pipe(
391399
filter(nodes => nodes.isCompatible),
392400
take(1)

0 commit comments

Comments
 (0)