Skip to content

Commit 1606119

Browse files
authored
[7.14] [SO Migration] fix reindex race on multi-instance mode (#104516) (#104761)
* [SO Migration] fix reindex race on multi-instance mode (#104516) * fix reindex race condition * fix some IT tests * fix reindex cause detection * add integration test * update RFC * review comments * add integration test for isWriteBlockException # Conflicts: # rfcs/text/0013_saved_object_migrations.md * fix dataset for 7.14
1 parent 2c227f5 commit 1606119

File tree

11 files changed

+695
-42
lines changed

11 files changed

+695
-42
lines changed

src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.test.ts

Lines changed: 169 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,96 @@
66
* Side Public License, v 1.
77
*/
88

9-
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
9+
import * as Either from 'fp-ts/Either';
1010
import { errors as EsErrors } from '@elastic/elasticsearch';
11-
jest.mock('./catch_retryable_es_client_errors');
1211
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
12+
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
1313
import { bulkOverwriteTransformedDocuments } from './bulk_overwrite_transformed_documents';
1414

15+
jest.mock('./catch_retryable_es_client_errors');
16+
1517
describe('bulkOverwriteTransformedDocuments', () => {
1618
beforeEach(() => {
1719
jest.clearAllMocks();
1820
});
1921

20-
// Create a mock client that rejects all methods with a 503 status code
21-
// response.
22-
const retryableError = new EsErrors.ResponseError(
23-
elasticsearchClientMock.createApiResponse({
24-
statusCode: 503,
25-
body: { error: { type: 'es_type', reason: 'es_reason' } },
26-
})
27-
);
28-
const client = elasticsearchClientMock.createInternalClient(
29-
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
30-
);
22+
it('resolves with `right:bulk_index_succeeded` if no error is encountered', async () => {
23+
const client = elasticsearchClientMock.createInternalClient(
24+
elasticsearchClientMock.createSuccessTransportRequestPromise({
25+
items: [
26+
{
27+
index: {
28+
_index: '.dolly',
29+
},
30+
},
31+
{
32+
index: {
33+
_index: '.dolly',
34+
},
35+
},
36+
],
37+
})
38+
);
39+
40+
const task = bulkOverwriteTransformedDocuments({
41+
client,
42+
index: 'new_index',
43+
transformedDocs: [],
44+
refresh: 'wait_for',
45+
});
46+
47+
const result = await task();
48+
49+
expect(Either.isRight(result)).toBe(true);
50+
expect((result as Either.Right<any>).right).toEqual('bulk_index_succeeded');
51+
});
52+
53+
it('resolves with `right:bulk_index_succeeded` if version conflict errors are encountered', async () => {
54+
const client = elasticsearchClientMock.createInternalClient(
55+
elasticsearchClientMock.createSuccessTransportRequestPromise({
56+
items: [
57+
{
58+
index: {
59+
_index: '.dolly',
60+
},
61+
},
62+
{
63+
index: {
64+
error: {
65+
type: 'version_conflict_engine_exception',
66+
reason: 'reason',
67+
},
68+
},
69+
},
70+
],
71+
})
72+
);
73+
74+
const task = bulkOverwriteTransformedDocuments({
75+
client,
76+
index: 'new_index',
77+
transformedDocs: [],
78+
refresh: 'wait_for',
79+
});
80+
81+
const result = await task();
82+
83+
expect(Either.isRight(result)).toBe(true);
84+
expect((result as Either.Right<any>).right).toEqual('bulk_index_succeeded');
85+
});
86+
3187
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
88+
// Create a mock client that rejects all methods with a 503 status code response.
89+
const retryableError = new EsErrors.ResponseError(
90+
elasticsearchClientMock.createApiResponse({
91+
statusCode: 503,
92+
body: { error: { type: 'es_type', reason: 'es_reason' } },
93+
})
94+
);
95+
const client = elasticsearchClientMock.createInternalClient(
96+
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
97+
);
98+
3299
const task = bulkOverwriteTransformedDocuments({
33100
client,
34101
index: 'new_index',
@@ -43,4 +110,93 @@ describe('bulkOverwriteTransformedDocuments', () => {
43110

44111
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
45112
});
113+
114+
it('resolves with `left:target_index_had_write_block` if all errors are write block exceptions', async () => {
115+
const client = elasticsearchClientMock.createInternalClient(
116+
elasticsearchClientMock.createSuccessTransportRequestPromise({
117+
items: [
118+
{
119+
index: {
120+
error: {
121+
type: 'cluster_block_exception',
122+
reason:
123+
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
124+
},
125+
},
126+
},
127+
{
128+
index: {
129+
error: {
130+
type: 'cluster_block_exception',
131+
reason:
132+
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
133+
},
134+
},
135+
},
136+
],
137+
})
138+
);
139+
140+
const task = bulkOverwriteTransformedDocuments({
141+
client,
142+
index: 'new_index',
143+
transformedDocs: [],
144+
refresh: 'wait_for',
145+
});
146+
147+
const result = await task();
148+
149+
expect(Either.isLeft(result)).toBe(true);
150+
expect((result as Either.Left<any>).left).toEqual({
151+
type: 'target_index_had_write_block',
152+
});
153+
});
154+
155+
it('throws an error if any error is not a write block exceptions', async () => {
156+
(catchRetryableEsClientErrors as jest.Mock).mockImplementation((e) => {
157+
throw e;
158+
});
159+
160+
const client = elasticsearchClientMock.createInternalClient(
161+
elasticsearchClientMock.createSuccessTransportRequestPromise({
162+
items: [
163+
{
164+
index: {
165+
error: {
166+
type: 'cluster_block_exception',
167+
reason:
168+
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
169+
},
170+
},
171+
},
172+
{
173+
index: {
174+
error: {
175+
type: 'dolly_exception',
176+
reason: 'because',
177+
},
178+
},
179+
},
180+
{
181+
index: {
182+
error: {
183+
type: 'cluster_block_exception',
184+
reason:
185+
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
186+
},
187+
},
188+
},
189+
],
190+
})
191+
);
192+
193+
const task = bulkOverwriteTransformedDocuments({
194+
client,
195+
index: 'new_index',
196+
transformedDocs: [],
197+
refresh: 'wait_for',
198+
});
199+
200+
await expect(task()).rejects.toThrow();
201+
});
46202
});

src/core/server/saved_objects/migrationsv2/actions/bulk_overwrite_transformed_documents.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ import {
1515
catchRetryableEsClientErrors,
1616
RetryableEsClientError,
1717
} from './catch_retryable_es_client_errors';
18+
import { isWriteBlockException } from './es_errors';
1819
import { WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE } from './constants';
20+
import type { TargetIndexHadWriteBlock } from './index';
1921

2022
/** @internal */
2123
export interface BulkOverwriteTransformedDocumentsParams {
@@ -24,6 +26,7 @@ export interface BulkOverwriteTransformedDocumentsParams {
2426
transformedDocs: SavedObjectsRawDoc[];
2527
refresh?: estypes.Refresh;
2628
}
29+
2730
/**
2831
* Write the up-to-date transformed documents to the index, overwriting any
2932
* documents that are still on their outdated version.
@@ -34,7 +37,7 @@ export const bulkOverwriteTransformedDocuments = ({
3437
transformedDocs,
3538
refresh = false,
3639
}: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither<
37-
RetryableEsClientError,
40+
RetryableEsClientError | TargetIndexHadWriteBlock,
3841
'bulk_index_succeeded'
3942
> => () => {
4043
return client
@@ -71,12 +74,19 @@ export const bulkOverwriteTransformedDocuments = ({
7174
.then((res) => {
7275
// Filter out version_conflict_engine_exception since these just mean
7376
// that another instance already updated these documents
74-
const errors = (res.body.items ?? []).filter(
75-
(item) => item.index?.error?.type !== 'version_conflict_engine_exception'
76-
);
77+
const errors = (res.body.items ?? [])
78+
.filter((item) => item.index?.error)
79+
.map((item) => item.index!.error!)
80+
.filter(({ type }) => type !== 'version_conflict_engine_exception');
81+
7782
if (errors.length === 0) {
7883
return Either.right('bulk_index_succeeded' as const);
7984
} else {
85+
if (errors.every(isWriteBlockException)) {
86+
return Either.left({
87+
type: 'target_index_had_write_block' as const,
88+
});
89+
}
8090
throw new Error(JSON.stringify(errors));
8191
}
8292
})
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
import { isIncompatibleMappingException, isWriteBlockException } from './es_errors';
10+
11+
describe('isWriteBlockError', () => {
12+
it('returns true for a `index write` cluster_block_exception', () => {
13+
expect(
14+
isWriteBlockException({
15+
type: 'cluster_block_exception',
16+
reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/index write (api)]`,
17+
})
18+
).toEqual(true);
19+
});
20+
it('returns true for a `moving to block index write` cluster_block_exception', () => {
21+
expect(
22+
isWriteBlockException({
23+
type: 'cluster_block_exception',
24+
reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/moving to block index write (api)]`,
25+
})
26+
).toEqual(true);
27+
});
28+
it('returns false for incorrect type', () => {
29+
expect(
30+
isWriteBlockException({
31+
type: 'not_a_cluster_block_exception_at_all',
32+
reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/index write (api)]`,
33+
})
34+
).toEqual(false);
35+
});
36+
});
37+
38+
describe('isIncompatibleMappingExceptionError', () => {
39+
it('returns true for `strict_dynamic_mapping_exception` errors', () => {
40+
expect(
41+
isIncompatibleMappingException({
42+
type: 'strict_dynamic_mapping_exception',
43+
reason: 'idk',
44+
})
45+
).toEqual(true);
46+
});
47+
48+
it('returns true for `mapper_parsing_exception` errors', () => {
49+
expect(
50+
isIncompatibleMappingException({
51+
type: 'mapper_parsing_exception',
52+
reason: 'idk',
53+
})
54+
).toEqual(true);
55+
});
56+
});
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
export interface EsErrorCause {
10+
type: string;
11+
reason: string;
12+
}
13+
14+
export const isWriteBlockException = ({ type, reason }: EsErrorCause): boolean => {
15+
return (
16+
type === 'cluster_block_exception' &&
17+
reason.match(/index \[.+] blocked by: \[FORBIDDEN\/8\/.+ \(api\)\]/) !== null
18+
);
19+
};
20+
21+
export const isIncompatibleMappingException = ({ type }: EsErrorCause): boolean => {
22+
return type === 'strict_dynamic_mapping_exception' || type === 'mapper_parsing_exception';
23+
};

src/core/server/saved_objects/migrationsv2/actions/integration_tests/actions.test.ts

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -181,14 +181,17 @@ describe('migration actions', () => {
181181
{ _source: { title: 'doc 3' } },
182182
{ _source: { title: 'doc 4' } },
183183
] as unknown) as SavedObjectsRawDoc[];
184-
await expect(
185-
bulkOverwriteTransformedDocuments({
186-
client,
187-
index: 'new_index_without_write_block',
188-
transformedDocs: sourceDocs,
189-
refresh: 'wait_for',
190-
})()
191-
).rejects.toMatchObject(expect.anything());
184+
185+
const res = (await bulkOverwriteTransformedDocuments({
186+
client,
187+
index: 'new_index_without_write_block',
188+
transformedDocs: sourceDocs,
189+
refresh: 'wait_for',
190+
})()) as Either.Left<unknown>;
191+
192+
expect(res.left).toEqual({
193+
type: 'target_index_had_write_block',
194+
});
192195
});
193196
it('resolves left index_not_found_exception when the index does not exist', async () => {
194197
expect.assertions(1);
@@ -1094,6 +1097,7 @@ describe('migration actions', () => {
10941097
return Either.right({ processedDocs });
10951098
};
10961099
}
1100+
10971101
const transformTask = transformDocs({
10981102
transformRawDocs: innerTransformRawDocs,
10991103
outdatedDocuments: originalDocs,
@@ -1496,7 +1500,7 @@ describe('migration actions', () => {
14961500
}
14971501
`);
14981502
});
1499-
it('rejects if there are errors', async () => {
1503+
it('resolves left if there are write_block errors', async () => {
15001504
const newDocs = ([
15011505
{ _source: { title: 'doc 5' } },
15021506
{ _source: { title: 'doc 6' } },
@@ -1509,7 +1513,14 @@ describe('migration actions', () => {
15091513
transformedDocs: newDocs,
15101514
refresh: 'wait_for',
15111515
})()
1512-
).rejects.toMatchObject(expect.anything());
1516+
).resolves.toMatchInlineSnapshot(`
1517+
Object {
1518+
"_tag": "Left",
1519+
"left": Object {
1520+
"type": "target_index_had_write_block",
1521+
},
1522+
}
1523+
`);
15131524
});
15141525
});
15151526
});

0 commit comments

Comments
 (0)