Skip to content

Ilm retryable steps #48010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@ public String getResourceType() {
}

// lower cases and adds underscores to transitions in a name
private static String toUnderscoreCase(String value) {
public static String toUnderscoreCase(String value) {
StringBuilder sb = new StringBuilder();
boolean changed = false;
for (int i = 0; i < value.length(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public StepKey getNextStepKey() {
return nextStepKey;
}

/**
* Indicates if the step is retryable when it encounters an execution error.
*/
public boolean isRetryable() {
// TODO: default to false
return true;
}

@Override
public int hashCode() {
return Objects.hash(key, nextStepKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,46 @@ public void testExplainFilters() throws Exception {
});
}

public void testILMRolloverRetriesOnReadOnlyBlock() throws Exception {
String firstIndex = index + "-000001";

Request updateLifecylePollSetting = new Request("PUT", "_cluster/settings");
updateLifecylePollSetting.setJsonEntity("{" +
" \"transient\": {\n" +
" \"indices.lifecycle.poll_interval\" : \"1s\" \n" +
" }\n" +
"}");
client().performRequest(updateLifecylePollSetting);

createNewSingletonPolicy("hot", new RolloverAction(null, TimeValue.timeValueSeconds(1), null));

// create the index as readonly and associate the ILM policy to it
createIndexWithSettings(
firstIndex,
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(LifecycleSettings.LIFECYCLE_NAME, policy)
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias")
.put("index.blocks.read_only_allow_delete", true),
true
);

// ILM should run into an error step as the rollover could not be executed
assertBusy(() -> assertThat(getStepKeyForIndex(firstIndex).getName(), equalTo(ErrorStep.NAME)));

// remove the readonly block
Request allowWritesOnIndexSettingUpdate = new Request("PUT", firstIndex + "/_settings");
allowWritesOnIndexSettingUpdate.setJsonEntity("{" +
" \"index\": {\n" +
" \"blocks.read_only_allow_delete\" : \"false\" \n" +
" }\n" +
"}");
client().performRequest(allowWritesOnIndexSettingUpdate);

// index is not readonly so the ILM should complete eventually
assertBusy(() -> assertThat(getStepKeyForIndex(firstIndex), equalTo(TerminalPolicyStep.KEY)));
}

public void testILMRolloverOnManuallyRolledIndex() throws Exception {
String originalIndex = index + "-000001";
String secondIndex = index + "-000002";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,26 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -42,13 +49,20 @@
import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
import org.elasticsearch.xpack.core.ilm.action.RetryAction;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.LongSupplier;

import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE;
import static org.elasticsearch.ElasticsearchException.toUnderscoreCase;
import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.xpack.core.ilm.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY;
import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_ORIGINATION_DATE;

Expand All @@ -60,6 +74,8 @@ public class IndexLifecycleRunner {
private PolicyStepsRegistry stepRegistry;
private ClusterService clusterService;
private LongSupplier nowSupplier;
// TODO: ugh, state
private Map<Step, ActionListener<RetryAction.Response>> inProgressRetries = new ConcurrentHashMap<>();

public IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ClusterService clusterService,
ThreadPool threadPool, LongSupplier nowSupplier) {
Expand Down Expand Up @@ -118,7 +134,25 @@ public void runPeriodicStep(String policy, IndexMetaData indexMetaData) {
logger.debug("policy [{}] for index [{}] complete, skipping execution", policy, index);
return;
} else if (currentStep instanceof ErrorStep) {
logger.debug("policy [{}] for index [{}] on an error step, skipping execution", policy, index);
Step failedStep = stepRegistry.getStep(indexMetaData, new StepKey(lifecycleState.getPhase(), lifecycleState.getAction(),
lifecycleState.getFailedStep()));
if (failedStep.isRetryable()) {
if (inProgressRetries.containsKey(failedStep)) {
logger.debug("a retry for policy [{}] for index [{}] on step [{}] is in progress", policy, index,
lifecycleState.getFailedStep());
return;
}
ActionListener<RetryAction.Response> listener = createRetryFailedStepListener(policy, indexMetaData, failedStep);
if (isFailedStepFailureRetryable(lifecycleState)) {
// todo the key in this map needs to have the index and possibly policy name as well (but hopefully we won't need this map)
inProgressRetries.put(failedStep, listener);
retryFailedStep("ilm-retry-failed-step", new RetryAction.Request(index), listener);
} else {
logger.debug("policy [{}] for index [{}] on an error step, skipping execution", policy, index);
}
} else {
logger.debug("policy [{}] for index [{}] on an error step, skipping execution", policy, index);
}
return;
}

Expand Down Expand Up @@ -154,6 +188,100 @@ public void onFailure(Exception e) {
}
}

private ActionListener<RetryAction.Response> createRetryFailedStepListener(String policy, IndexMetaData indexMetaData,
Step failedStep) {
return new ActionListener<>() {
private static final int MAX_NUMBER_OF_RETRIES = 10;
private final Iterator<TimeValue> backoffIterator = exponentialBackoff(TimeValue.timeValueSeconds(5), MAX_NUMBER_OF_RETRIES).
iterator();

@Override
public void onResponse(RetryAction.Response response) {
inProgressRetries.remove(failedStep);
}

@Override
public void onFailure(Exception e) {
String index = indexMetaData.getIndex().getName();
if (backoffIterator.hasNext()) {
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData);
// todo unify the mechanism to check if exceptions are retryable once we clarify if we'll use the metadata stuff
// todo it's importat to test this again here as it might fail due to a different cause
if (ElasticsearchException.getExceptionName(e).equals(toUnderscoreCase(ClusterBlockException.class.getSimpleName()))) {
// TODO add jitter to the retry interval to avoid correlated spikes
TimeValue scheduleRetryInterval = backoffIterator.next();
logger.info("scheduling the retry of step [{}] as part of policy [{}] as it failed due to [{}]",
lifecycleState.getFailedStep(), policy, e.getMessage());
threadPool.scheduleWithFixedDelay(() -> retryFailedStep("ilm-retry-failed-step",
new RetryAction.Request(indexMetaData.getIndex().getName()), this),
scheduleRetryInterval, ThreadPool.Names.GENERIC);
} else {
logger.warn("policy [{} for index [{}] encountered a terminal error on step [{}], skipping execution", policy
, index, lifecycleState.getFailedStep());
}
} else {
logger.debug("policy [{}] for index [{}] on an error step after [{}] retries, skipping execution", policy, index,
MAX_NUMBER_OF_RETRIES);
}
}
};
}

void retryFailedStep(String source, RetryAction.Request request, ActionListener<RetryAction.Response> listener) {
clusterService.submitStateUpdateTask(source,
new AckedClusterStateUpdateTask<>(request, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
return moveClusterStateToFailedStep(currentState, request.indices());
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
for (String index : request.indices()) {
IndexMetaData idxMeta = newState.metaData().index(index);
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
StepKey retryStep = new StepKey(lifecycleState.getPhase(), lifecycleState.getAction(), lifecycleState.getStep());
if (idxMeta == null) {
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
logger.debug("index [" + index + "] has been deleted after moving to step [" +
lifecycleState.getStep() + "], skipping async action check");
return;
}
String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings());
maybeRunAsyncAction(newState, idxMeta, policyName, retryStep);
}
}

@Override
protected RetryAction.Response newResponse(boolean acknowledged) {
return new RetryAction.Response(acknowledged);
}
});
}

private boolean isFailedStepFailureRetryable(LifecycleExecutionState lifecycleState) {
try {
XContentParser parser =
JsonXContent.jsonXContent.createParser(new NamedXContentRegistry(ClusterModule.getNamedXWriteables()),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, lifecycleState.getStepInfo());
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
ElasticsearchException elasticsearchException = ElasticsearchException.fromXContent(parser);
String exceptionMessage = elasticsearchException.getDetailedMessage();
int typeIndex = exceptionMessage.indexOf("type=");
int endTypeIndex = exceptionMessage.indexOf(",", typeIndex);
if (typeIndex != -1 && endTypeIndex != -1) {
String exceptionType = exceptionMessage.substring(typeIndex + 5, endTypeIndex);
// todo whitelist a series of retryable exceptions (in underscore case)
if (exceptionType.equals(toUnderscoreCase(ClusterBlockException.class.getSimpleName()))) {
return true;
}
}
} catch (IOException e) {
// can't figure out what the exception is so we'll not retry
}
return false;
}

/**
* If the current step (matching the expected step key) is an asynchronous action step, run it
*/
Expand Down Expand Up @@ -219,7 +347,20 @@ public void runPolicyAfterStateChange(String policy, IndexMetaData indexMetaData
logger.debug("policy [{}] for index [{}] complete, skipping execution", policy, index);
return;
} else if (currentStep instanceof ErrorStep) {
logger.debug("policy [{}] for index [{}] on an error step, skipping execution", policy, index);
Step failedStep = stepRegistry.getStep(indexMetaData, new StepKey(lifecycleState.getPhase(), lifecycleState.getAction(),
lifecycleState.getFailedStep()));
if (failedStep.isRetryable()) {
ActionListener<RetryAction.Response> listener = createRetryFailedStepListener(policy, indexMetaData, failedStep);
// todo the key in this map needs to have the index and possibly policy name as well (but hopefully we won't need this map)
inProgressRetries.put(failedStep, listener);
if (isFailedStepFailureRetryable(lifecycleState)) {
retryFailedStep("ilm-retry-failed-step", new RetryAction.Request(index), listener);
} else {
logger.debug("policy [{}] for index [{}] on an error step, skipping execution", policy, index);
}
} else {
logger.debug("policy [{}] for index [{}] on an error step, skipping execution", policy, index);
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.xpack.core.ilm.OperationMode;
import org.elasticsearch.xpack.core.ilm.ShrinkStep;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.core.ilm.action.RetryAction;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;

import java.io.Closeable;
Expand Down Expand Up @@ -310,6 +312,10 @@ public void submitOperationModeUpdate(OperationMode mode) {
clusterService.submitStateUpdateTask("ilm_operation_mode_update", OperationModeUpdateTask.ilmMode(mode));
}

public void retryFailedStep(String source, RetryAction.Request request, ActionListener<RetryAction.Response> listener) {
lifecycleRunner.retryFailedStep(source, request, listener);
}

/**
* Method that checks if the lifecycle state of the cluster service is stopped or closed. This
* enhances the readability of the code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,19 @@

package org.elasticsearch.xpack.ilm.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.elasticsearch.xpack.core.ilm.action.RetryAction;
import org.elasticsearch.xpack.core.ilm.action.RetryAction.Request;
import org.elasticsearch.xpack.core.ilm.action.RetryAction.Response;
Expand All @@ -34,8 +28,6 @@

public class TransportRetryAction extends TransportMasterNodeAction<Request, Response> {

private static final Logger logger = LogManager.getLogger(TransportRetryAction.class);

IndexLifecycleService indexLifecycleService;

@Inject
Expand All @@ -58,34 +50,7 @@ protected Response read(StreamInput in) throws IOException {

@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) {
clusterService.submitStateUpdateTask("ilm-re-run",
new AckedClusterStateUpdateTask<Response>(request, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
return indexLifecycleService.moveClusterStateToFailedStep(currentState, request.indices());
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
for (String index : request.indices()) {
IndexMetaData idxMeta = newState.metaData().index(index);
LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
StepKey retryStep = new StepKey(lifecycleState.getPhase(), lifecycleState.getAction(), lifecycleState.getStep());
if (idxMeta == null) {
// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
logger.debug("index [" + index + "] has been deleted after moving to step [" +
lifecycleState.getStep() + "], skipping async action check");
return;
}
indexLifecycleService.maybeRunAsyncAction(newState, idxMeta, retryStep);
}
}

@Override
protected Response newResponse(boolean acknowledged) {
return new Response(acknowledged);
}
});
indexLifecycleService.retryFailedStep("ilm-re-run", request, listener);
}

@Override
Expand Down