Skip to content

Commit b29e8ee

Browse files
pgayvalletmshustov
andauthored
migrate retryCallCluster for new ES client (#71412)
* adapt retryCallCluster for new ES client * review comments * retry on 408 ResponseError * use error name instead of instanceof base check * use error name instead of instanceof base check bis * use mockImplementationOnce chaining Co-authored-by: restrry <restrry@gmail.com>
1 parent ec4f9d5 commit b29e8ee

File tree

4 files changed

+389
-2
lines changed

4 files changed

+389
-2
lines changed

src/core/server/elasticsearch/client/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ export { IScopedClusterClient, ScopedClusterClient } from './scoped_cluster_clie
2222
export { ElasticsearchClientConfig } from './client_config';
2323
export { IClusterClient, ICustomClusterClient, ClusterClient } from './cluster_client';
2424
export { configureClient } from './configure_client';
25+
export { retryCallCluster, migrationRetryCallCluster } from './retry_call_cluster';
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
import { errors } from '@elastic/elasticsearch';
21+
import { elasticsearchClientMock } from './mocks';
22+
import { loggingSystemMock } from '../../logging/logging_system.mock';
23+
import { retryCallCluster, migrationRetryCallCluster } from './retry_call_cluster';
24+
25+
const dummyBody = { foo: 'bar' };
26+
const createErrorReturn = (err: any) => elasticsearchClientMock.createClientError(err);
27+
28+
describe('retryCallCluster', () => {
29+
let client: ReturnType<typeof elasticsearchClientMock.createElasticSearchClient>;
30+
31+
beforeEach(() => {
32+
client = elasticsearchClientMock.createElasticSearchClient();
33+
});
34+
35+
it('returns response from ES API call in case of success', async () => {
36+
const successReturn = elasticsearchClientMock.createClientResponse({ ...dummyBody });
37+
38+
client.asyncSearch.get.mockReturnValue(successReturn);
39+
40+
const result = await retryCallCluster(() => client.asyncSearch.get());
41+
expect(result.body).toEqual(dummyBody);
42+
});
43+
44+
it('retries ES API calls that rejects with `NoLivingConnectionsError`', async () => {
45+
const successReturn = elasticsearchClientMock.createClientResponse({ ...dummyBody });
46+
47+
client.asyncSearch.get
48+
.mockImplementationOnce(() =>
49+
createErrorReturn(new errors.NoLivingConnectionsError('no living connections', {} as any))
50+
)
51+
.mockImplementationOnce(() => successReturn);
52+
53+
const result = await retryCallCluster(() => client.asyncSearch.get());
54+
expect(result.body).toEqual(dummyBody);
55+
});
56+
57+
it('rejects when ES API calls reject with other errors', async () => {
58+
client.ping
59+
.mockImplementationOnce(() => createErrorReturn(new Error('unknown error')))
60+
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
61+
62+
await expect(retryCallCluster(() => client.ping())).rejects.toMatchInlineSnapshot(
63+
`[Error: unknown error]`
64+
);
65+
});
66+
67+
it('stops retrying when ES API calls reject with other errors', async () => {
68+
client.ping
69+
.mockImplementationOnce(() =>
70+
createErrorReturn(new errors.NoLivingConnectionsError('no living connections', {} as any))
71+
)
72+
.mockImplementationOnce(() =>
73+
createErrorReturn(new errors.NoLivingConnectionsError('no living connections', {} as any))
74+
)
75+
.mockImplementationOnce(() => createErrorReturn(new Error('unknown error')))
76+
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
77+
78+
await expect(retryCallCluster(() => client.ping())).rejects.toMatchInlineSnapshot(
79+
`[Error: unknown error]`
80+
);
81+
});
82+
});
83+
84+
describe('migrationRetryCallCluster', () => {
85+
let client: ReturnType<typeof elasticsearchClientMock.createElasticSearchClient>;
86+
let logger: ReturnType<typeof loggingSystemMock.createLogger>;
87+
88+
beforeEach(() => {
89+
client = elasticsearchClientMock.createElasticSearchClient();
90+
logger = loggingSystemMock.createLogger();
91+
});
92+
93+
const mockClientPingWithErrorBeforeSuccess = (error: any) => {
94+
client.ping
95+
.mockImplementationOnce(() => createErrorReturn(error))
96+
.mockImplementationOnce(() => createErrorReturn(error))
97+
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
98+
};
99+
100+
it('retries ES API calls that rejects with `NoLivingConnectionsError`', async () => {
101+
mockClientPingWithErrorBeforeSuccess(
102+
new errors.NoLivingConnectionsError('no living connections', {} as any)
103+
);
104+
105+
const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
106+
expect(result.body).toEqual(dummyBody);
107+
});
108+
109+
it('retries ES API calls that rejects with `ConnectionError`', async () => {
110+
mockClientPingWithErrorBeforeSuccess(new errors.ConnectionError('connection error', {} as any));
111+
112+
const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
113+
expect(result.body).toEqual(dummyBody);
114+
});
115+
116+
it('retries ES API calls that rejects with `TimeoutError`', async () => {
117+
mockClientPingWithErrorBeforeSuccess(new errors.TimeoutError('timeout error', {} as any));
118+
119+
const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
120+
expect(result.body).toEqual(dummyBody);
121+
});
122+
123+
it('retries ES API calls that rejects with 503 `ResponseError`', async () => {
124+
mockClientPingWithErrorBeforeSuccess(
125+
new errors.ResponseError({
126+
statusCode: 503,
127+
} as any)
128+
);
129+
130+
const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
131+
expect(result.body).toEqual(dummyBody);
132+
});
133+
134+
it('retries ES API calls that rejects 401 `ResponseError`', async () => {
135+
mockClientPingWithErrorBeforeSuccess(
136+
new errors.ResponseError({
137+
statusCode: 401,
138+
} as any)
139+
);
140+
141+
const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
142+
expect(result.body).toEqual(dummyBody);
143+
});
144+
145+
it('retries ES API calls that rejects with 403 `ResponseError`', async () => {
146+
mockClientPingWithErrorBeforeSuccess(
147+
new errors.ResponseError({
148+
statusCode: 403,
149+
} as any)
150+
);
151+
152+
const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
153+
expect(result.body).toEqual(dummyBody);
154+
});
155+
156+
it('retries ES API calls that rejects with 408 `ResponseError`', async () => {
157+
mockClientPingWithErrorBeforeSuccess(
158+
new errors.ResponseError({
159+
statusCode: 408,
160+
} as any)
161+
);
162+
163+
const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
164+
expect(result.body).toEqual(dummyBody);
165+
});
166+
167+
it('retries ES API calls that rejects with 410 `ResponseError`', async () => {
168+
mockClientPingWithErrorBeforeSuccess(
169+
new errors.ResponseError({
170+
statusCode: 410,
171+
} as any)
172+
);
173+
174+
const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
175+
expect(result.body).toEqual(dummyBody);
176+
});
177+
178+
it('retries ES API calls that rejects with `snapshot_in_progress_exception` `ResponseError`', async () => {
179+
mockClientPingWithErrorBeforeSuccess(
180+
new errors.ResponseError({
181+
statusCode: 500,
182+
body: {
183+
error: {
184+
type: 'snapshot_in_progress_exception',
185+
},
186+
},
187+
} as any)
188+
);
189+
190+
const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
191+
expect(result.body).toEqual(dummyBody);
192+
});
193+
194+
it('logs only once for each unique error message', async () => {
195+
client.ping
196+
.mockImplementationOnce(() =>
197+
createErrorReturn(
198+
new errors.ResponseError({
199+
statusCode: 503,
200+
} as any)
201+
)
202+
)
203+
.mockImplementationOnce(() =>
204+
createErrorReturn(new errors.ConnectionError('connection error', {} as any))
205+
)
206+
.mockImplementationOnce(() =>
207+
createErrorReturn(
208+
new errors.ResponseError({
209+
statusCode: 503,
210+
} as any)
211+
)
212+
)
213+
.mockImplementationOnce(() =>
214+
createErrorReturn(new errors.ConnectionError('connection error', {} as any))
215+
)
216+
.mockImplementationOnce(() =>
217+
createErrorReturn(
218+
new errors.ResponseError({
219+
statusCode: 500,
220+
body: {
221+
error: {
222+
type: 'snapshot_in_progress_exception',
223+
},
224+
},
225+
} as any)
226+
)
227+
)
228+
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
229+
230+
await migrationRetryCallCluster(() => client.ping(), logger, 1);
231+
232+
expect(loggingSystemMock.collect(logger).warn).toMatchInlineSnapshot(`
233+
Array [
234+
Array [
235+
"Unable to connect to Elasticsearch. Error: Response Error",
236+
],
237+
Array [
238+
"Unable to connect to Elasticsearch. Error: connection error",
239+
],
240+
Array [
241+
"Unable to connect to Elasticsearch. Error: snapshot_in_progress_exception",
242+
],
243+
]
244+
`);
245+
});
246+
247+
it('rejects when ES API calls reject with other errors', async () => {
248+
client.ping
249+
.mockImplementationOnce(() =>
250+
createErrorReturn(
251+
new errors.ResponseError({
252+
statusCode: 418,
253+
body: {
254+
error: {
255+
type: `I'm a teapot`,
256+
},
257+
},
258+
} as any)
259+
)
260+
)
261+
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
262+
263+
await expect(
264+
migrationRetryCallCluster(() => client.ping(), logger, 1)
265+
).rejects.toMatchInlineSnapshot(`[ResponseError: I'm a teapot]`);
266+
});
267+
268+
it('stops retrying when ES API calls reject with other errors', async () => {
269+
client.ping
270+
.mockImplementationOnce(() =>
271+
createErrorReturn(new errors.TimeoutError('timeout error', {} as any))
272+
)
273+
.mockImplementationOnce(() =>
274+
createErrorReturn(new errors.TimeoutError('timeout error', {} as any))
275+
)
276+
.mockImplementationOnce(() => createErrorReturn(new Error('unknown error')))
277+
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
278+
279+
await expect(
280+
migrationRetryCallCluster(() => client.ping(), logger, 1)
281+
).rejects.toMatchInlineSnapshot(`[Error: unknown error]`);
282+
});
283+
});
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
import { defer, throwError, iif, timer } from 'rxjs';
21+
import { concatMap, retryWhen } from 'rxjs/operators';
22+
import { Logger } from '../../logging';
23+
24+
const retryResponseStatuses = [
25+
503, // ServiceUnavailable
26+
401, // AuthorizationException
27+
403, // AuthenticationException
28+
408, // RequestTimeout
29+
410, // Gone
30+
];
31+
32+
/**
33+
* Retries the provided Elasticsearch API call when a `NoLivingConnectionsError` error is
34+
* encountered. The API call will be retried once a second, indefinitely, until
35+
* a successful response or a different error is received.
36+
*
37+
* @example
38+
* ```ts
39+
* const response = await retryCallCluster(() => client.ping());
40+
* ```
41+
*
42+
* @internal
43+
*/
44+
export const retryCallCluster = <T extends Promise<unknown>>(apiCaller: () => T): T => {
45+
return defer(() => apiCaller())
46+
.pipe(
47+
retryWhen((errors) =>
48+
errors.pipe(
49+
concatMap((error) =>
50+
iif(() => error.name === 'NoLivingConnectionsError', timer(1000), throwError(error))
51+
)
52+
)
53+
)
54+
)
55+
.toPromise() as T;
56+
};
57+
58+
/**
59+
* Retries the provided Elasticsearch API call when an error such as
60+
* `AuthenticationException` `NoConnections`, `ConnectionFault`,
61+
* `ServiceUnavailable` or `RequestTimeout` are encountered. The API call will
62+
* be retried once a second, indefinitely, until a successful response or a
63+
* different error is received.
64+
*
65+
* @example
66+
* ```ts
67+
* const response = await migrationRetryCallCluster(() => client.ping(), logger);
68+
* ```
69+
*
70+
* @internal
71+
*/
72+
export const migrationRetryCallCluster = <T extends Promise<unknown>>(
73+
apiCaller: () => T,
74+
log: Logger,
75+
delay: number = 2500
76+
): T => {
77+
const previousErrors: string[] = [];
78+
return defer(() => apiCaller())
79+
.pipe(
80+
retryWhen((errors) =>
81+
errors.pipe(
82+
concatMap((error) => {
83+
if (!previousErrors.includes(error.message)) {
84+
log.warn(`Unable to connect to Elasticsearch. Error: ${error.message}`);
85+
previousErrors.push(error.message);
86+
}
87+
return iif(
88+
() =>
89+
error.name === 'NoLivingConnectionsError' ||
90+
error.name === 'ConnectionError' ||
91+
error.name === 'TimeoutError' ||
92+
(error.name === 'ResponseError' &&
93+
retryResponseStatuses.includes(error.statusCode)) ||
94+
error?.body?.error?.type === 'snapshot_in_progress_exception',
95+
timer(delay),
96+
throwError(error)
97+
);
98+
})
99+
)
100+
)
101+
)
102+
.toPromise() as T;
103+
};

0 commit comments

Comments
 (0)