Skip to content

Delete watcher legacy templates #80915

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.HeaderWarning;
Expand Down Expand Up @@ -177,6 +178,76 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
);
}

/**
* Removes the provided legacy index templates if they exist. This only matches the templates using exact name, wildcards are not
* supported.
*
* The provided templates must exist in the cluster, otherwise an {@link IndexTemplateMissingException} is reported.
*/
public static void removeTemplates(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename to removeLegacyTemplates? To emphasise this batch removal method is for legacy templates only.

ClusterService clusterService,
Set<String> templateNames,
ActionListener<AcknowledgedResponse> listener
) {
clusterService.submitStateUpdateTask(
"remove-templates [" + String.join(",", templateNames) + "]",
new ClusterStateUpdateTask(Priority.URGENT, MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT) {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return innerRemoveTemplates(currentState, templateNames);
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(AcknowledgedResponse.TRUE);
}
}
);
}

/**
* Removes the provided legacy templates. If an index name that doesn't exist is provided, it'll fail with
* {@link IndexTemplateMissingException}
*/
// Package visible for testing
static ClusterState innerRemoveTemplates(ClusterState currentState, Set<String> templateNames) {
Set<String> existingTemplatesToDelete = new HashSet<>(templateNames.size(), 1.0f);
ImmutableOpenMap<String, IndexTemplateMetadata> clusterLegacyTemplates = currentState.getMetadata().templates();
Set<String> missingNames = null;
for (String legacyTemplate : templateNames) {
if (clusterLegacyTemplates.containsKey(legacyTemplate)) {
existingTemplatesToDelete.add(legacyTemplate);
} else {
if (missingNames == null) {
missingNames = new HashSet<>();
}

missingNames.add(legacyTemplate);
}
}

if (missingNames != null) {
throw new IndexTemplateMissingException(String.join(",", missingNames));
}

if (existingTemplatesToDelete.isEmpty() == false) {
Metadata.Builder metadata = Metadata.builder(currentState.metadata());
for (String template : existingTemplatesToDelete) {
logger.info("removing template [{}]", template);
metadata.removeTemplate(template);
}
return ClusterState.builder(currentState).metadata(metadata).build();
} else {
return currentState;
}
}

/**
* Add the given component template to the cluster state. If {@code create} is true, an
* exception will be thrown if the component template already exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2128,6 +2128,123 @@ public void onFailure(Exception e) {
return throwables;
}

public void testRemoveMultipleLegacyIndexTemplates() throws Exception {
MetadataIndexTemplateService metadataIndexTemplateService = getMetadataIndexTemplateService();
AtomicReference<Throwable> failure = new AtomicReference<>();

{
PutRequest request = new PutRequest("api", "foo");
request.patterns(singletonList("fo*"));
request.mappings(new CompressedXContent("{}"));
final CountDownLatch latch = new CountDownLatch(1);
metadataIndexTemplateService.putTemplate(request, new MetadataIndexTemplateService.PutListener() {
@Override
public void onResponse(MetadataIndexTemplateService.PutResponse response) {
latch.countDown();
}

@Override
public void onFailure(Exception e) {
logger.error(e.getMessage(), e);
failure.set(e);
latch.countDown();
}
});
latch.await(10, TimeUnit.SECONDS);
assertThat("Unable to put template due to: " + failure.get(), failure.get(), nullValue());
}

{
PutRequest request = new PutRequest("api", "bar");
request.patterns(singletonList("ba*"));
request.mappings(new CompressedXContent("{}"));

final CountDownLatch latch = new CountDownLatch(1);
metadataIndexTemplateService.putTemplate(request, new MetadataIndexTemplateService.PutListener() {
@Override
public void onResponse(MetadataIndexTemplateService.PutResponse response) {
latch.countDown();
}

@Override
public void onFailure(Exception e) {
logger.error(e.getMessage(), e);
failure.set(e);
latch.countDown();
}
});
latch.await(10, TimeUnit.SECONDS);
assertThat("Unable to put template due to: " + failure.get(), failure.get(), nullValue());
}

ClusterService clusterService = getInstanceFromNode(ClusterService.class);
ClusterState nextState = MetadataIndexTemplateService.innerRemoveTemplates(clusterService.state(), Set.of("foo", "bar"));
ImmutableOpenMap<String, IndexTemplateMetadata> templates = nextState.metadata().templates();
assertThat(templates.containsKey("foo"), is(false));
assertThat(templates.containsKey("bar"), is(false));
}

public void testRemovingLegacyMissingTemplatesFails() throws Exception {
MetadataIndexTemplateService metadataIndexTemplateService = getMetadataIndexTemplateService();
AtomicReference<Throwable> failure = new AtomicReference<>();
{
PutRequest request = new PutRequest("api", "foo");
request.patterns(singletonList("fo*"));
request.mappings(new CompressedXContent("{}"));

final CountDownLatch latch = new CountDownLatch(1);
metadataIndexTemplateService.putTemplate(request, new MetadataIndexTemplateService.PutListener() {
@Override
public void onResponse(MetadataIndexTemplateService.PutResponse response) {
latch.countDown();
}

@Override
public void onFailure(Exception e) {
logger.error(e.getMessage(), e);
failure.set(e);
latch.countDown();
}
});
latch.await(10, TimeUnit.SECONDS);
assertThat("Unable to put template due to: " + failure.get(), failure.get(), nullValue());
}

{
PutRequest request = new PutRequest("api", "bar");
request.patterns(singletonList("ba*"));
request.mappings(new CompressedXContent("{}"));
final CountDownLatch latch = new CountDownLatch(1);
metadataIndexTemplateService.putTemplate(request, new MetadataIndexTemplateService.PutListener() {
@Override
public void onResponse(MetadataIndexTemplateService.PutResponse response) {
latch.countDown();
}

@Override
public void onFailure(Exception e) {
logger.error(e.getMessage(), e);
failure.set(e);
latch.countDown();
}
});
latch.await(10, TimeUnit.SECONDS);
assertThat("Unable to put template due to: " + failure.get(), failure.get(), nullValue());
}

ClusterService clusterService = getInstanceFromNode(ClusterService.class);
IndexTemplateMissingException indexTemplateMissingException = expectThrows(
IndexTemplateMissingException.class,
() -> MetadataIndexTemplateService.innerRemoveTemplates(clusterService.state(), Set.of("foo", "bar", "missing", "other_mssing"))
);
assertThat(indexTemplateMissingException.getMessage(), is("index_template [missing,other_mssing] missing"));

// let's also test the templates that did exists were not removed
ImmutableOpenMap<String, IndexTemplateMetadata> templates = clusterService.state().metadata().templates();
assertThat(templates.containsKey("foo"), is(true));
assertThat(templates.containsKey("bar"), is(true));
}

private List<Throwable> putTemplateDetail(PutRequest request) throws Exception {
MetadataIndexTemplateService service = getMetadataIndexTemplateService();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -26,11 +31,13 @@
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -41,14 +48,21 @@
public class WatcherLifeCycleService implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(WatcherLifeCycleService.class);

public static final Set<String> LEGACY_WATCHER_INDEX_TEMPLATES = Set.of(".watches", ".triggered_watches");

private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STARTED);
private final AtomicReference<List<ShardRouting>> previousShardRoutings = new AtomicReference<>(Collections.emptyList());
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
private final ClusterService clusterService;
private volatile WatcherService watcherService;
private final EnumSet<WatcherState> stopStates = EnumSet.of(WatcherState.STOPPED, WatcherState.STOPPING);
private final AtomicBoolean legacyTemplatesDeleteInProgress = new AtomicBoolean(false);
private final AtomicBoolean checkForLegacyTemplates = new AtomicBoolean(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should checking whether a removal of templates is in progress be part of the functionality that MetadataIndexTemplateService.removeTemplates(...) method offers?


WatcherLifeCycleService(ClusterService clusterService, WatcherService watcherService) {
this.watcherService = watcherService;
this.clusterService = clusterService;
clusterService.addListener(this);
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
// happen because we're shutting down and an watch is scheduled.
Expand Down Expand Up @@ -96,6 +110,39 @@ public void clusterChanged(ClusterChangedEvent event) {
return;
}

if (event.localNodeMaster() && checkForLegacyTemplates.get()) {
List<String> existingTemplatesToDelete = new ArrayList<>(LEGACY_WATCHER_INDEX_TEMPLATES.size());
ImmutableOpenMap<String, IndexTemplateMetadata> clusterLegacyTemplates = event.state().getMetadata().templates();
for (String legacyWatcherIndexTemplate : LEGACY_WATCHER_INDEX_TEMPLATES) {
if (clusterLegacyTemplates.containsKey(legacyWatcherIndexTemplate)) {
existingTemplatesToDelete.add(legacyWatcherIndexTemplate);
}
}

if (existingTemplatesToDelete.isEmpty() == false) {
// if someone else is executing the deletion of templates (due to fast successive cluster updates), we'll skip doing so
if (legacyTemplatesDeleteInProgress.compareAndSet(false, true) == false) {
return;
}

MetadataIndexTemplateService.removeTemplates(clusterService, LEGACY_WATCHER_INDEX_TEMPLATES, ActionListener.wrap(r -> {
legacyTemplatesDeleteInProgress.set(false);
// we've done it so we shouldn't check anymore
checkForLegacyTemplates.set(false);
logger.debug("deleted legacy Watcher index templates [{}]", String.join(",", LEGACY_WATCHER_INDEX_TEMPLATES));
}, e -> {
legacyTemplatesDeleteInProgress.set(false);
logger.debug(
new ParameterizedMessage(
"unable to delete legacy Watcher index templates [{}]",
String.join(",", LEGACY_WATCHER_INDEX_TEMPLATES)
),
e
);
}));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should checkForLegacyTemplates be set to false when existingTemplatesToDelete list is empty?

}

boolean isWatcherStoppedManually = isWatcherStoppedManually(event.state());
boolean isStoppedOrStopping = stopStates.contains(this.state.get());
// if this is not a data node, we need to start it ourselves possibly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -562,6 +563,13 @@ public void testWatcherReloadsOnNodeOutageWithWatcherShard() {
verify(watcherService).reload(eq(event.state()), anyString());
}

public void testLegacyWatcherTemplatesToDelete() {
assertThat(WatcherLifeCycleService.LEGACY_WATCHER_INDEX_TEMPLATES.size(), is(2));
Iterator<String> legacyTemplatesIterator = WatcherLifeCycleService.LEGACY_WATCHER_INDEX_TEMPLATES.iterator();
assertThat(legacyTemplatesIterator.next(), is(".watches"));
assertThat(legacyTemplatesIterator.next(), is(".triggered_watches"));
}

private void startWatcher() {
Index index = new Index(Watch.INDEX, "uuid");
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;

import java.nio.charset.StandardCharsets;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;

public class WatcherRestartIT extends AbstractUpgradeTestCase {
Expand Down Expand Up @@ -46,4 +49,30 @@ private void ensureWatcherStarted() throws Exception {
assertThat(responseBody, not(containsString("\"watcher_state\":\"stopped\"")));
});
}

public void testEnsureWatcherDeletesLegacyTemplates() throws Exception {
client().performRequest(new Request("POST", "/_watcher/_start"));
ensureWatcherStarted();

// All the legacy ML templates we created over the years should be deleted now they're no longer needed
assertBusy(() -> {
Request request = new Request("GET", "/_template/*watches*");
try {
Response response = client().performRequest(request);
Map<String, Object> responseLevel = entityAsMap(response);
assertNotNull(responseLevel);

assertThat(responseLevel.containsKey(".watches"), is(false));
assertThat(responseLevel.containsKey(".triggered_watches"), is(false));
} catch (ResponseException e) {
// Not found is fine
assertThat(
"Unexpected failure getting templates: " + e.getResponse().getStatusLine(),
e.getResponse().getStatusLine().getStatusCode(),
is(404)
);
}
});
}

}