-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Use ILM for Watcher history deletion #37443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f37b3c9
a01c0db
59b7ab7
474b5c3
9aa2f06
bbcfdb9
f51ca5d
87410da
31c69f5
827bcb0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.core.indexlifecycle; | ||
|
||
import org.elasticsearch.ElasticsearchParseException; | ||
import org.elasticsearch.common.bytes.BytesArray; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.common.compress.NotXContentException; | ||
import org.elasticsearch.common.io.Streams; | ||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; | ||
import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
import org.elasticsearch.common.xcontent.XContentHelper; | ||
import org.elasticsearch.common.xcontent.XContentParser; | ||
import org.elasticsearch.common.xcontent.XContentType; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
|
||
/** | ||
* A utility class used for loading index lifecycle policies from the resource classpath | ||
*/ | ||
public class LifecyclePolicyUtils { | ||
|
||
private LifecyclePolicyUtils() {}; | ||
|
||
/** | ||
* Loads a built-in index lifecycle policy and returns its source. | ||
*/ | ||
public static LifecyclePolicy loadPolicy(String name, String resource, NamedXContentRegistry xContentRegistry) { | ||
try { | ||
BytesReference source = load(resource); | ||
validate(source); | ||
|
||
try (XContentParser parser = XContentType.JSON.xContent() | ||
.createParser(xContentRegistry, LoggingDeprecationHandler.THROW_UNSUPPORTED_OPERATION, source.utf8ToString())) { | ||
return LifecyclePolicy.parse(parser, name); | ||
} | ||
} catch (Exception e) { | ||
throw new IllegalArgumentException("unable to load policy [" + name + "] from [" + resource + "]", e); | ||
} | ||
} | ||
|
||
/** | ||
* Loads a resource from the classpath and returns it as a {@link BytesReference} | ||
*/ | ||
private static BytesReference load(String name) throws IOException { | ||
try (InputStream is = LifecyclePolicyUtils.class.getResourceAsStream(name)) { | ||
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { | ||
Streams.copy(is, out); | ||
return new BytesArray(out.toByteArray()); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Parses and validates that the source is not empty. | ||
*/ | ||
private static void validate(BytesReference source) { | ||
if (source == null) { | ||
throw new ElasticsearchParseException("policy must not be null"); | ||
} | ||
|
||
try { | ||
XContentHelper.convertToMap(source, false, XContentType.JSON).v2(); | ||
} catch (NotXContentException e) { | ||
throw new ElasticsearchParseException("policy must not be empty"); | ||
} catch (Exception e) { | ||
throw new ElasticsearchParseException("invalid policy", e); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
{ | ||
"phases": { | ||
"delete": { | ||
"min_age": "7d", | ||
"actions": { | ||
"delete": {} | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,13 +18,20 @@ | |
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
import org.elasticsearch.common.xcontent.XContentType; | ||
import org.elasticsearch.gateway.GatewayService; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.xpack.core.XPackClient; | ||
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata; | ||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; | ||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyUtils; | ||
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction; | ||
import org.elasticsearch.xpack.core.template.TemplateUtils; | ||
import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField; | ||
|
||
import java.nio.charset.StandardCharsets; | ||
import java.util.Optional; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
import java.util.concurrent.Executor; | ||
|
@@ -46,17 +53,23 @@ public class WatcherIndexTemplateRegistry implements ClusterStateListener { | |
TEMPLATE_CONFIG_TRIGGERED_WATCHES, TEMPLATE_CONFIG_WATCH_HISTORY, TEMPLATE_CONFIG_WATCHES | ||
}; | ||
|
||
public static final PolicyConfig POLICY_WATCH_HISTORY = new PolicyConfig("watch-history-ilm-policy", "/watch-history-ilm-policy.json"); | ||
|
||
private static final Logger logger = LogManager.getLogger(WatcherIndexTemplateRegistry.class); | ||
|
||
private final Client client; | ||
private final ThreadPool threadPool; | ||
private final TemplateConfig[] indexTemplates; | ||
private final NamedXContentRegistry xContentRegistry; | ||
private final ConcurrentMap<String, AtomicBoolean> templateCreationsInProgress = new ConcurrentHashMap<>(); | ||
private final AtomicBoolean historyPolicyCreationInProgress = new AtomicBoolean(); | ||
|
||
public WatcherIndexTemplateRegistry(ClusterService clusterService, ThreadPool threadPool, Client client) { | ||
public WatcherIndexTemplateRegistry(ClusterService clusterService, ThreadPool threadPool, Client client, | ||
NamedXContentRegistry xContentRegistry) { | ||
this.client = client; | ||
this.threadPool = threadPool; | ||
this.indexTemplates = TEMPLATE_CONFIGS; | ||
this.xContentRegistry = xContentRegistry; | ||
clusterService.addListener(this); | ||
} | ||
|
||
|
@@ -82,6 +95,7 @@ public void clusterChanged(ClusterChangedEvent event) { | |
|
||
if (event.localNodeMaster() || localNodeVersionAfterMaster) { | ||
addTemplatesIfMissing(state); | ||
addIndexLifecyclePolicyIfMissing(state); | ||
dakrone marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
|
@@ -127,6 +141,54 @@ public void onFailure(Exception e) { | |
}); | ||
} | ||
|
||
// Package visible for testing | ||
LifecyclePolicy loadWatcherHistoryPolicy() { | ||
return LifecyclePolicyUtils.loadPolicy(POLICY_WATCH_HISTORY.policyName, POLICY_WATCH_HISTORY.fileName, xContentRegistry); | ||
} | ||
|
||
private void addIndexLifecyclePolicyIfMissing(ClusterState state) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any concerns about multiple nodes entering this path and putting the policy multiple times ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this only happens when the local node is master, it should only get called once with the atomic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ahh.. thanks, I missed this will only run on the master. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor nit: it is not only the master node, but is also put when this node is newer than the master node. The reason for this is, that with regards to the watcher history, a node that is not the master node might be updated first in a cluster, and that one could write the watch history in a format which requires a new watch history template. I am not sure if this applies to ILM as well, as this is an administrative task compared to the fact that the watch history is written by non master nodes. |
||
if (historyPolicyCreationInProgress.compareAndSet(false, true)) { | ||
final LifecyclePolicy policyOnDisk = loadWatcherHistoryPolicy(); | ||
|
||
Optional<IndexLifecycleMetadata> maybeMeta = Optional.ofNullable(state.metaData().custom(IndexLifecycleMetadata.TYPE)); | ||
final boolean needsUpdating = maybeMeta | ||
.flatMap(ilmMeta -> Optional.ofNullable(ilmMeta.getPolicies().get(policyOnDisk.getName()))) | ||
.isPresent() == false; // If there is no policy then one needs to be put; | ||
|
||
if (needsUpdating) { | ||
putPolicy(policyOnDisk, historyPolicyCreationInProgress); | ||
} else { | ||
historyPolicyCreationInProgress.set(false); | ||
} | ||
dakrone marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
private void putPolicy(final LifecyclePolicy policy, final AtomicBoolean creationCheck) { | ||
final Executor executor = threadPool.generic(); | ||
executor.execute(() -> { | ||
PutLifecycleAction.Request request = new PutLifecycleAction.Request(policy); | ||
request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); | ||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, request, | ||
new ActionListener<PutLifecycleAction.Response>() { | ||
@Override | ||
public void onResponse(PutLifecycleAction.Response response) { | ||
creationCheck.set(false); | ||
if (response.isAcknowledged() == false) { | ||
logger.error("error adding watcher index lifecycle policy [{}], request was not acknowledged", | ||
policy.getName()); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
creationCheck.set(false); | ||
logger.error(new ParameterizedMessage("error adding watcher index lifecycle policy [{}]", | ||
policy.getName()), e); | ||
} | ||
}, (req, listener) -> new XPackClient(client).ilmClient().putLifecyclePolicy(req, listener)); | ||
}); | ||
} | ||
|
||
public static boolean validate(ClusterState state) { | ||
return state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME) && | ||
state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.TRIGGERED_TEMPLATE_NAME) && | ||
|
@@ -153,9 +215,19 @@ public String getTemplateName() { | |
|
||
public byte[] load() { | ||
String template = TemplateUtils.loadTemplate("/" + fileName + ".json", WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION, | ||
Pattern.quote("${xpack.watcher.template.version}")); | ||
Pattern.quote("${xpack.watcher.template.version}")); | ||
assert template != null && template.length() > 0; | ||
return template.getBytes(StandardCharsets.UTF_8); | ||
} | ||
} | ||
public static class PolicyConfig { | ||
|
||
private final String policyName; | ||
private String fileName; | ||
|
||
PolicyConfig(String templateName, String fileName) { | ||
this.policyName = templateName; | ||
this.fileName = fileName; | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.