Skip to content

Filter enrich policy index deletes to just the policy's associated indices #82568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,32 @@ public static String getBaseName(String policyName) {
return ENRICH_INDEX_NAME_BASE + policyName;
}

/**
* Given a policy name and a timestamp, return the enrich index name that should be used.
*
* @param policyName the name of the policy
* @param nowTimestamp the current time
* @return an enrich index name
*/
public static String getIndexName(String policyName, long nowTimestamp) {
Objects.nonNull(policyName);
return EnrichPolicy.getBaseName(policyName) + "-" + nowTimestamp;
}

/**
* Tests whether the named policy is associated with the named index according to the naming
* pattern that exists between policy names and index names.
*
* @param policyName the policy name
* @param indexName the index name
* @return true if and only if the named policy is associated with the named index
*/
public static boolean isPolicyForIndex(String policyName, String indexName) {
Objects.nonNull(policyName);
Objects.nonNull(indexName);
return indexName.matches(EnrichPolicy.getBaseName(policyName) + "-" + "\\d+");
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ private XContentBuilder createEnrichMappingBuilder(CheckedFunction<XContentBuild

private void prepareAndCreateEnrichIndex(List<Map<String, Object>> mappings) {
long nowTimestamp = nowSupplier.getAsLong();
String enrichIndexName = EnrichPolicy.getBaseName(policyName) + "-" + nowTimestamp;
String enrichIndexName = EnrichPolicy.getIndexName(policyName, nowTimestamp);
Settings enrichIndexSettings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;

import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN;

Expand Down Expand Up @@ -82,23 +83,24 @@ protected void masterOperation(
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state); // ensure the policy exists first
final String policyName = request.getName();
final EnrichPolicy policy = EnrichStore.getPolicy(policyName, state); // ensure the policy exists first
if (policy == null) {
throw new ResourceNotFoundException("policy [{}] not found", request.getName());
throw new ResourceNotFoundException("policy [{}] not found", policyName);
}

enrichPolicyLocks.lockPolicy(request.getName());
enrichPolicyLocks.lockPolicy(policyName);
try {
List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
List<String> pipelinesWithProcessors = new ArrayList<>();
final List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
final List<String> pipelinesWithProcessors = new ArrayList<>();

for (PipelineConfiguration pipelineConfiguration : pipelines) {
List<AbstractEnrichProcessor> enrichProcessors = ingestService.getProcessorsInPipeline(
pipelineConfiguration.getId(),
AbstractEnrichProcessor.class
);
for (AbstractEnrichProcessor processor : enrichProcessors) {
if (processor.getPolicyName().equals(request.getName())) {
if (processor.getPolicyName().equals(policyName)) {
pipelinesWithProcessors.add(pipelineConfiguration.getId());
}
}
Expand All @@ -108,26 +110,30 @@ protected void masterOperation(
throw new ElasticsearchStatusException(
"Could not delete policy [{}] because a pipeline is referencing it {}",
RestStatus.CONFLICT,
request.getName(),
policyName,
pipelinesWithProcessors
);
}
} catch (Exception e) {
enrichPolicyLocks.releasePolicy(request.getName());
enrichPolicyLocks.releasePolicy(policyName);
listener.onFailure(e);
return;
}

GetIndexRequest indices = new GetIndexRequest().indices(EnrichPolicy.getBaseName(request.getName()) + "-*")
final GetIndexRequest indices = new GetIndexRequest().indices(EnrichPolicy.getBaseName(policyName) + "-*")
.indicesOptions(IndicesOptions.lenientExpand());

String[] concreteIndices = indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, indices);

deleteIndicesAndPolicy(concreteIndices, request.getName(), ActionListener.wrap((response) -> {
enrichPolicyLocks.releasePolicy(request.getName());
// the wildcard expansion could be too wide (e.g. in the case of a policy named policy-1 and another named policy-10),
// so we need to filter down to just the concrete indices that are actually indices for this policy
concreteIndices = Stream.of(concreteIndices).filter(i -> EnrichPolicy.isPolicyForIndex(policyName, i)).toArray(String[]::new);

deleteIndicesAndPolicy(concreteIndices, policyName, ActionListener.wrap((response) -> {
enrichPolicyLocks.releasePolicy(policyName);
listener.onResponse(response);
}, (exc) -> {
enrichPolicyLocks.releasePolicy(request.getName());
enrichPolicyLocks.releasePolicy(policyName);
listener.onFailure(exc);
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,19 @@ public static void assertEqualPolicies(EnrichPolicy expectedInstance, EnrichPoli
assertThat(newInstance.getMatchField(), equalTo(expectedInstance.getMatchField()));
assertThat(newInstance.getEnrichFields(), equalTo(expectedInstance.getEnrichFields()));
}

public void testIsPolicyForIndex() {
String policy1 = "policy-1";
String policy2 = "policy-10"; // the first policy is a prefix of the second policy!

String index1 = EnrichPolicy.getIndexName(policy1, 1000);
String index2 = EnrichPolicy.getIndexName(policy2, 2000);

assertTrue(EnrichPolicy.isPolicyForIndex(policy1, index1));
assertTrue(EnrichPolicy.isPolicyForIndex(policy2, index2));

assertFalse(EnrichPolicy.isPolicyForIndex(policy1, index2));
assertFalse(EnrichPolicy.isPolicyForIndex(policy2, index1));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public void cleanupPolicy() {

public void testDeletePolicyDoesNotExistUnlocksPolicy() throws InterruptedException {
String fakeId = "fake-id";
createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo1");
createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo2");
createIndex(EnrichPolicy.getIndexName(fakeId, 1001));
createIndex(EnrichPolicy.getIndexName(fakeId, 1002));

final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> reference = new AtomicReference<>();
Expand Down Expand Up @@ -128,13 +128,13 @@ public void testDeleteIsNotLocked() throws Exception {
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
}

createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
createIndex(EnrichPolicy.getIndexName(name, 1001));
createIndex(EnrichPolicy.getIndexName(name, 1002));

client().admin()
.indices()
.prepareGetIndex()
.setIndices(EnrichPolicy.getBaseName(name) + "-foo1", EnrichPolicy.getBaseName(name) + "-foo2")
.setIndices(EnrichPolicy.getIndexName(name, 1001), EnrichPolicy.getIndexName(name, 1002))
.get();

final CountDownLatch latch = new CountDownLatch(1);
Expand All @@ -160,7 +160,7 @@ public void onFailure(final Exception e) {
() -> client().admin()
.indices()
.prepareGetIndex()
.setIndices(EnrichPolicy.getBaseName(name) + "-foo1", EnrichPolicy.getBaseName(name) + "-foo2")
.setIndices(EnrichPolicy.getIndexName(name, 1001), EnrichPolicy.getIndexName(name, 1001))
.get()
);

Expand All @@ -183,8 +183,8 @@ public void testDeleteLocked() throws InterruptedException {
AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
assertThat(error.get(), nullValue());

createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
createIndex(EnrichPolicy.getIndexName(name, 1001));
createIndex(EnrichPolicy.getIndexName(name, 1002));

EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
Expand Down Expand Up @@ -241,4 +241,50 @@ public void onFailure(final Exception e) {
assertNull(EnrichStore.getPolicy(name, clusterService.state()));
}
}

public void testDeletePolicyPrefixes() throws InterruptedException {
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
ClusterService clusterService = getInstanceFromNode(ClusterService.class);

String name = "my-policy";
String otherName = "my-policy-two"; // the first policy is a prefix of this one

final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
AtomicReference<Exception> error;
error = saveEnrichPolicy(name, policy, clusterService);
assertThat(error.get(), nullValue());
error = saveEnrichPolicy(otherName, policy, clusterService);
assertThat(error.get(), nullValue());

// create an index for the *other* policy
createIndex(EnrichPolicy.getIndexName(otherName, 1001));

{
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();

ActionTestUtils.execute(transportAction, null, new DeleteEnrichPolicyAction.Request(name), new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
reference.set(acknowledgedResponse);
latch.countDown();
}

public void onFailure(final Exception e) {
fail();
}
});
latch.await();
assertNotNull(reference.get());
assertTrue(reference.get().isAcknowledged());

assertNull(EnrichStore.getPolicy(name, clusterService.state()));

// deleting name policy should have no effect on the other policy
assertNotNull(EnrichStore.getPolicy(otherName, clusterService.state()));

// and the index associated with the other index should be unaffected
client().admin().indices().prepareGetIndex().setIndices(EnrichPolicy.getIndexName(otherName, 1001)).get();
}
}
}