Skip to content

Commit 225db31

Browse files
authored
Validate enrich index before completing policy execution (elastic#100106)
This PR adds a validation step to the end of an enrich policy run to ensure the integrity of the enrich index that is about to be promoted.
1 parent 1369ff2 commit 225db31

File tree

3 files changed

+138
-8
lines changed

3 files changed

+138
-8
lines changed

docs/changelog/100106.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 100106
2+
summary: Validate enrich index before completing policy execution
3+
area: Ingest Node
4+
type: bug
5+
issues: []

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
1111
import org.elasticsearch.ElasticsearchException;
12+
import org.elasticsearch.ResourceNotFoundException;
1213
import org.elasticsearch.action.ActionListener;
1314
import org.elasticsearch.action.ActionRequest;
1415
import org.elasticsearch.action.ActionResponse;
@@ -43,6 +44,7 @@
4344
import org.elasticsearch.common.settings.Settings;
4445
import org.elasticsearch.common.util.Maps;
4546
import org.elasticsearch.common.util.iterable.Iterables;
47+
import org.elasticsearch.index.IndexNotFoundException;
4648
import org.elasticsearch.index.IndexVersion;
4749
import org.elasticsearch.index.mapper.Mapper;
4850
import org.elasticsearch.index.mapper.MapperService;
@@ -132,13 +134,9 @@ public void run() {
132134
logger.debug("Policy [{}]: Checking source indices [{}]", policyName, sourceIndices);
133135
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(sourceIndices);
134136
// This call does not set the origin to ensure that the user executing the policy has permission to access the source index
135-
client.admin().indices().getIndex(getIndexRequest, listener.delegateFailure((l, getIndexResponse) -> {
136-
try {
137-
validateMappings(getIndexResponse);
138-
prepareAndCreateEnrichIndex(toMappings(getIndexResponse));
139-
} catch (Exception e) {
140-
l.onFailure(e);
141-
}
137+
client.admin().indices().getIndex(getIndexRequest, listener.delegateFailureAndWrap((l, getIndexResponse) -> {
138+
validateMappings(getIndexResponse);
139+
prepareAndCreateEnrichIndex(toMappings(getIndexResponse));
142140
}));
143141
} catch (Exception e) {
144142
listener.onFailure(e);
@@ -624,14 +622,66 @@ private void waitForIndexGreen(final String destinationIndexName) {
624622
ClusterHealthRequest request = new ClusterHealthRequest(destinationIndexName).waitForGreenStatus();
625623
enrichOriginClient().admin()
626624
.cluster()
627-
.health(request, listener.delegateFailure((l, r) -> updateEnrichPolicyAlias(destinationIndexName)));
625+
.health(request, listener.delegateFailureAndWrap((l, r) -> updateEnrichPolicyAlias(destinationIndexName)));
626+
}
627+
628+
/**
629+
* Ensures that the index we are about to promote at the end of a policy execution exists, is intact, and has not been damaged
630+
* during the policy execution. In some cases, it is possible for the index being constructed to be deleted during the policy execution
631+
* and recreated with invalid mappings/data. We validate that the mapping exists and that it contains the expected meta fields on it to
632+
* guard against accidental removal and recreation during policy execution.
633+
*/
634+
private void validateIndexBeforePromotion(String destinationIndexName, ClusterState clusterState) {
635+
IndexMetadata destinationIndex = clusterState.metadata().index(destinationIndexName);
636+
if (destinationIndex == null) {
637+
throw new IndexNotFoundException(
638+
"was not able to promote it as part of executing enrich policy [" + policyName + "]",
639+
destinationIndexName
640+
);
641+
}
642+
MappingMetadata mapping = destinationIndex.mapping();
643+
if (mapping == null) {
644+
throw new ResourceNotFoundException(
645+
"Could not locate mapping for enrich index [{}] while completing [{}] policy run",
646+
destinationIndexName,
647+
policyName
648+
);
649+
}
650+
Map<String, Object> mappingSource = mapping.sourceAsMap();
651+
Object meta = mappingSource.get("_meta");
652+
if (meta instanceof Map<?, ?> metaMap) {
653+
Object policyNameMetaField = metaMap.get(ENRICH_POLICY_NAME_FIELD_NAME);
654+
if (policyNameMetaField == null) {
655+
throw new ElasticsearchException(
656+
"Could not verify enrich index [{}] metadata before completing [{}] policy run: policy name meta field missing",
657+
destinationIndexName,
658+
policyName
659+
);
660+
} else if (policyName.equals(policyNameMetaField) == false) {
661+
throw new ElasticsearchException(
662+
"Could not verify enrich index [{}] metadata before completing [{}] policy run: policy name meta field does not "
663+
+ "match expected value of [{}], was [{}]",
664+
destinationIndexName,
665+
policyName,
666+
policyName,
667+
policyNameMetaField.toString()
668+
);
669+
}
670+
} else {
671+
throw new ElasticsearchException(
672+
"Could not verify enrich index [{}] metadata before completing [{}] policy run: mapping meta field missing",
673+
destinationIndexName,
674+
policyName
675+
);
676+
}
628677
}
629678

630679
private void updateEnrichPolicyAlias(final String destinationIndexName) {
631680
String enrichIndexBase = EnrichPolicy.getBaseName(policyName);
632681
logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase);
633682
GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase);
634683
ClusterState clusterState = clusterService.state();
684+
validateIndexBeforePromotion(destinationIndexName, clusterState);
635685
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(clusterState, aliasRequest);
636686
String[] aliases = aliasRequest.aliases();
637687
IndicesAliasesRequest aliasToggleRequest = new IndicesAliasesRequest();

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
1818
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
1919
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
20+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
21+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
2022
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
2123
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
2224
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
@@ -2253,6 +2255,79 @@ public void testEnrichNestedField() throws Exception {
22532255
""");
22542256
}
22552257

2258+
public void testRunnerValidatesIndexIntegrity() throws Exception {
2259+
final String sourceIndex = "source-index";
2260+
IndexResponse indexRequest = client().index(new IndexRequest().index(sourceIndex).id("id").source("""
2261+
{
2262+
"field1": "value1",
2263+
"field2": 2,
2264+
"field3": "ignored",
2265+
"field4": "ignored",
2266+
"field5": "value5"
2267+
}""", XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).actionGet();
2268+
assertEquals(RestStatus.CREATED, indexRequest.status());
2269+
2270+
SearchResponse sourceSearchResponse = client().search(
2271+
new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
2272+
).actionGet();
2273+
assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L));
2274+
Map<String, Object> sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap();
2275+
assertNotNull(sourceDocMap);
2276+
assertThat(sourceDocMap.get("field1"), is(equalTo("value1")));
2277+
assertThat(sourceDocMap.get("field2"), is(equalTo(2)));
2278+
assertThat(sourceDocMap.get("field3"), is(equalTo("ignored")));
2279+
assertThat(sourceDocMap.get("field4"), is(equalTo("ignored")));
2280+
assertThat(sourceDocMap.get("field5"), is(equalTo("value5")));
2281+
2282+
List<String> enrichFields = List.of("field2", "field5");
2283+
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields);
2284+
String policyName = "test1";
2285+
2286+
final long createTime = randomNonNegativeLong();
2287+
String createdEnrichIndex = ".enrich-test1-" + createTime;
2288+
final AtomicReference<Exception> exception = new AtomicReference<>();
2289+
final CountDownLatch latch = new CountDownLatch(1);
2290+
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
2291+
2292+
// Wrap the client so that when we receive the reindex action, we delete the index then resume operation. This mimics an invalid
2293+
// state for the resulting index.
2294+
Client client = new FilterClient(client()) {
2295+
@Override
2296+
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
2297+
ActionType<Response> action,
2298+
Request request,
2299+
ActionListener<Response> listener
2300+
) {
2301+
if (action.equals(EnrichReindexAction.INSTANCE)) {
2302+
super.doExecute(
2303+
DeleteIndexAction.INSTANCE,
2304+
new DeleteIndexRequest(createdEnrichIndex),
2305+
listener.delegateFailureAndWrap((delegate, response) -> {
2306+
if (response.isAcknowledged() == false) {
2307+
fail("Enrich index should have been deleted but was not");
2308+
}
2309+
super.doExecute(action, request, delegate);
2310+
})
2311+
);
2312+
} else {
2313+
super.doExecute(action, request, listener);
2314+
}
2315+
}
2316+
};
2317+
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(client, policyName, policy, listener, createdEnrichIndex);
2318+
2319+
logger.info("Starting policy run");
2320+
enrichPolicyRunner.run();
2321+
latch.await();
2322+
Exception runnerException = exception.get();
2323+
if (runnerException == null) {
2324+
fail("Expected the runner to fail when the underlying index was deleted during policy execution!");
2325+
}
2326+
assertThat(runnerException, is(instanceOf(ElasticsearchException.class)));
2327+
assertThat(runnerException.getMessage(), containsString("Could not verify enrich index"));
2328+
assertThat(runnerException.getMessage(), containsString("mapping meta field missing"));
2329+
}
2330+
22562331
private EnrichPolicyRunner createPolicyRunner(
22572332
String policyName,
22582333
EnrichPolicy policy,

0 commit comments

Comments
 (0)