|
9 | 9 | import org.apache.logging.log4j.LogManager; |
10 | 10 | import org.apache.logging.log4j.Logger; |
11 | 11 | import org.elasticsearch.ElasticsearchException; |
| 12 | +import org.elasticsearch.ResourceNotFoundException; |
12 | 13 | import org.elasticsearch.action.ActionListener; |
13 | 14 | import org.elasticsearch.action.ActionRequest; |
14 | 15 | import org.elasticsearch.action.ActionResponse; |
|
33 | 34 | import org.elasticsearch.client.internal.FilterClient; |
34 | 35 | import org.elasticsearch.client.internal.OriginSettingClient; |
35 | 36 | import org.elasticsearch.cluster.ClusterState; |
| 37 | +import org.elasticsearch.cluster.metadata.IndexMetadata; |
36 | 38 | import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; |
37 | 39 | import org.elasticsearch.cluster.metadata.MappingMetadata; |
38 | 40 | import org.elasticsearch.cluster.service.ClusterService; |
39 | 41 | import org.elasticsearch.common.Strings; |
40 | 42 | import org.elasticsearch.common.bytes.BytesArray; |
41 | 43 | import org.elasticsearch.common.settings.Settings; |
42 | 44 | import org.elasticsearch.core.CheckedFunction; |
| 45 | +import org.elasticsearch.index.IndexNotFoundException; |
43 | 46 | import org.elasticsearch.index.mapper.MapperService; |
44 | 47 | import org.elasticsearch.index.query.QueryBuilders; |
45 | 48 | import org.elasticsearch.index.reindex.BulkByScrollResponse; |
@@ -123,13 +126,9 @@ public void run() { |
123 | 126 | logger.debug("Policy [{}]: Checking source indices [{}]", policyName, sourceIndices); |
124 | 127 | GetIndexRequest getIndexRequest = new GetIndexRequest().indices(sourceIndices); |
125 | 128 | // This call does not set the origin to ensure that the user executing the policy has permission to access the source index |
126 | | - client.admin().indices().getIndex(getIndexRequest, listener.delegateFailure((l, getIndexResponse) -> { |
127 | | - try { |
128 | | - validateMappings(getIndexResponse); |
129 | | - prepareAndCreateEnrichIndex(toMappings(getIndexResponse)); |
130 | | - } catch (Exception e) { |
131 | | - l.onFailure(e); |
132 | | - } |
| 129 | + client.admin().indices().getIndex(getIndexRequest, listener.delegateFailureAndWrap((l, getIndexResponse) -> { |
| 130 | + validateMappings(getIndexResponse); |
| 131 | + prepareAndCreateEnrichIndex(toMappings(getIndexResponse)); |
133 | 132 | })); |
134 | 133 | } catch (Exception e) { |
135 | 134 | listener.onFailure(e); |
@@ -565,14 +564,66 @@ private void waitForIndexGreen(final String destinationIndexName) { |
565 | 564 | ClusterHealthRequest request = new ClusterHealthRequest(destinationIndexName).waitForGreenStatus(); |
566 | 565 | enrichOriginClient().admin() |
567 | 566 | .cluster() |
568 | | - .health(request, listener.delegateFailure((l, r) -> updateEnrichPolicyAlias(destinationIndexName))); |
| 567 | + .health(request, listener.delegateFailureAndWrap((l, r) -> updateEnrichPolicyAlias(destinationIndexName))); |
| 568 | + } |
| 569 | + |
| 570 | + /** |
| 571 | + * Ensures that the index we are about to promote at the end of a policy execution exists, is intact, and has not been damaged |
| 572 | + * during the policy execution. In some cases, it is possible for the index being constructed to be deleted during the policy execution |
| 573 | + * and recreated with invalid mappings/data. We validate that the mapping exists and that it contains the expected meta fields on it to |
| 574 | + * guard against accidental removal and recreation during policy execution. |
| 575 | + */ |
| 576 | + private void validateIndexBeforePromotion(String destinationIndexName, ClusterState clusterState) { |
| 577 | + IndexMetadata destinationIndex = clusterState.metadata().index(destinationIndexName); |
| 578 | + if (destinationIndex == null) { |
| 579 | + throw new IndexNotFoundException( |
| 580 | + "was not able to promote it as part of executing enrich policy [" + policyName + "]", |
| 581 | + destinationIndexName |
| 582 | + ); |
| 583 | + } |
| 584 | + MappingMetadata mapping = destinationIndex.mapping(); |
| 585 | + if (mapping == null) { |
| 586 | + throw new ResourceNotFoundException( |
| 587 | + "Could not locate mapping for enrich index [{}] while completing [{}] policy run", |
| 588 | + destinationIndexName, |
| 589 | + policyName |
| 590 | + ); |
| 591 | + } |
| 592 | + Map<String, Object> mappingSource = mapping.sourceAsMap(); |
| 593 | + Object meta = mappingSource.get("_meta"); |
| 594 | + if (meta instanceof Map<?, ?> metaMap) { |
| 595 | + Object policyNameMetaField = metaMap.get(ENRICH_POLICY_NAME_FIELD_NAME); |
| 596 | + if (policyNameMetaField == null) { |
| 597 | + throw new ElasticsearchException( |
| 598 | + "Could not verify enrich index [{}] metadata before completing [{}] policy run: policy name meta field missing", |
| 599 | + destinationIndexName, |
| 600 | + policyName |
| 601 | + ); |
| 602 | + } else if (policyName.equals(policyNameMetaField) == false) { |
| 603 | + throw new ElasticsearchException( |
| 604 | + "Could not verify enrich index [{}] metadata before completing [{}] policy run: policy name meta field does not " |
| 605 | + + "match expected value of [{}], was [{}]", |
| 606 | + destinationIndexName, |
| 607 | + policyName, |
| 608 | + policyName, |
| 609 | + policyNameMetaField.toString() |
| 610 | + ); |
| 611 | + } |
| 612 | + } else { |
| 613 | + throw new ElasticsearchException( |
| 614 | + "Could not verify enrich index [{}] metadata before completing [{}] policy run: mapping meta field missing", |
| 615 | + destinationIndexName, |
| 616 | + policyName |
| 617 | + ); |
| 618 | + } |
569 | 619 | } |
570 | 620 |
|
571 | 621 | private void updateEnrichPolicyAlias(final String destinationIndexName) { |
572 | 622 | String enrichIndexBase = EnrichPolicy.getBaseName(policyName); |
573 | 623 | logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase); |
574 | 624 | GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase); |
575 | 625 | ClusterState clusterState = clusterService.state(); |
| 626 | + validateIndexBeforePromotion(destinationIndexName, clusterState); |
576 | 627 | String[] concreteIndices = indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(clusterState, aliasRequest); |
577 | 628 | String[] aliases = aliasRequest.aliases(); |
578 | 629 | IndicesAliasesRequest aliasToggleRequest = new IndicesAliasesRequest(); |
|
0 commit comments