Skip to content

Simple ILM Task Batching Implementation #78547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package org.elasticsearch.xpack.ilm;

import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -139,9 +141,19 @@ public void testWaitInShrunkShardsAllocatedExceedsThreshold() throws Exception {
// shrink cycle is started
LongSupplier nowWayBackInThePastSupplier = () -> 1234L;
clusterService.submitStateUpdateTask("testing-move-to-step-to-manipulate-step-time",
new MoveToNextStepUpdateTask(managedIndexMetadata.getIndex(), policy, currentStepKey, currentStepKey,
nowWayBackInThePastSupplier, indexLifecycleService.getPolicyRegistry(), state -> {
}));
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return new MoveToNextStepUpdateTask(managedIndexMetadata.getIndex(), policy, currentStepKey, currentStepKey,
nowWayBackInThePastSupplier, indexLifecycleService.getPolicyRegistry(), state -> {
}).execute(currentState);
}

@Override
public void onFailure(String source, Exception e) {
throw new AssertionError(e);
}
});

String[] secondCycleShrinkIndexName = new String[1];
assertBusy(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Step.StepKey getNextStepKey() {
* @throws IOException if any exceptions occur
*/
@Override
public ClusterState execute(final ClusterState currentState) throws IOException {
public ClusterState doExecute(final ClusterState currentState) throws IOException {
Step currentStep = startStep;
IndexMetadata indexMetadata = currentState.metadata().index(index);
if (indexMetadata == null) {
Expand Down Expand Up @@ -172,25 +172,23 @@ public ClusterState execute(final ClusterState currentState) throws IOException

@Override
public void onClusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState.equals(newState) == false) {
IndexMetadata indexMetadata = newState.metadata().index(index);
if (indexMetadata != null) {

LifecycleExecutionState exState = LifecycleExecutionState.fromIndexMetadata(indexMetadata);
if (ErrorStep.NAME.equals(exState.getStep()) && this.failure != null) {
lifecycleRunner.registerFailedOperation(indexMetadata, failure);
} else {
lifecycleRunner.registerSuccessfulOperation(indexMetadata);
}
IndexMetadata indexMetadata = newState.metadata().index(index);
if (indexMetadata != null) {

LifecycleExecutionState exState = LifecycleExecutionState.fromIndexMetadata(indexMetadata);
if (ErrorStep.NAME.equals(exState.getStep()) && this.failure != null) {
lifecycleRunner.registerFailedOperation(indexMetadata, failure);
} else {
lifecycleRunner.registerSuccessfulOperation(indexMetadata);
}

if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY) {
logger.trace("[{}] step sequence starting with {} has completed, running next step {} if it is an async action",
if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY) {
logger.trace("[{}] step sequence starting with {} has completed, running next step {} if it is an async action",
index.getName(), startStep.getKey(), nextStepKey);
// After the cluster state has been processed and we have moved
// to a new step, we need to conditionally execute the step iff
// it is an `AsyncAction` so that it is executed exactly once.
lifecycleRunner.maybeRunAsyncAction(newState, indexMetadata, policy, nextStepKey);
}
// After the cluster state has been processed and we have moved
// to a new step, we need to conditionally execute the step iff
// it is an `AsyncAction` so that it is executed exactly once.
lifecycleRunner.maybeRunAsyncAction(newState, indexMetadata, policy, nextStepKey);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.ilm.Step;
Expand All @@ -18,7 +18,7 @@
* Base class for index lifecycle cluster state update tasks that requires implementing {@code equals} and {@code hashCode} to allow
* for these tasks to be deduplicated by {@link IndexLifecycleRunner}.
*/
public abstract class IndexLifecycleClusterStateUpdateTask extends ClusterStateUpdateTask {
public abstract class IndexLifecycleClusterStateUpdateTask implements ClusterStateTaskListener {

private final ListenableFuture<Void> listener = new ListenableFuture<>();

Expand All @@ -39,10 +39,25 @@ final Step.StepKey getCurrentStepKey() {
return currentStepKey;
}

private boolean executed;

public final ClusterState execute(ClusterState currentState) throws Exception {
assert executed == false;
final ClusterState updatedState = doExecute(currentState);
if (currentState != updatedState) {
executed = true;
}
return updatedState;
}

protected abstract ClusterState doExecute(ClusterState currentState) throws Exception;

@Override
public final void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(null);
onClusterStateProcessed(source, oldState, newState);
if (executed) {
onClusterStateProcessed(source, oldState, newState);
}
}

@Override
Expand All @@ -61,8 +76,11 @@ public final void addListener(ActionListener<Void> listener) {
}

/**
* This method is functionally the same as {@link ClusterStateUpdateTask#clusterStateProcessed(String, ClusterState, ClusterState)}
* This method is functionally the same as {@link ClusterStateTaskListener#clusterStateProcessed(String, ClusterState, ClusterState)}
* and implementations can override it as they would override {@code ClusterStateUpdateTask#clusterStateProcessed}.
* The only difference to {@code ClusterStateUpdateTask#clusterStateProcessed} is that if the {@link #execute(ClusterState)}
* implementation was a noop and returned the input cluster state, then this method will not be invoked. It is therefore guaranteed
* that {@code oldState} is always different from {@code newState}.
*/
protected void onClusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
Expand All @@ -74,8 +92,8 @@ protected void onClusterStateProcessed(String source, ClusterState oldState, Clu
public abstract int hashCode();

/**
* This method is functionally the same as {@link ClusterStateUpdateTask#onFailure(String, Exception)} and implementations can override
* it as they would override {@code ClusterStateUpdateTask#onFailure}.
* This method is functionally the same as {@link ClusterStateTaskListener#onFailure(String, Exception)} and implementations can
* override it as they would override {@code ClusterStateUpdateTask#onFailure}.
*/
protected abstract void handleFailure(String source, Exception e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -53,6 +55,21 @@ class IndexLifecycleRunner {
private final ILMHistoryStore ilmHistoryStore;
private final LongSupplier nowSupplier;

private static final ClusterStateTaskExecutor<IndexLifecycleClusterStateUpdateTask> ILM_TASK_EXECUTOR = (currentState, tasks) -> {
ClusterStateTaskExecutor.ClusterTasksResult.Builder<IndexLifecycleClusterStateUpdateTask> builder =
ClusterStateTaskExecutor.ClusterTasksResult.builder();
ClusterState state = currentState;
for (IndexLifecycleClusterStateUpdateTask task : tasks) {
try {
state = task.execute(state);
builder.success(task);
} catch (Exception e) {
builder.failure(task, e);
}
}
return builder.build(state);
};

IndexLifecycleRunner(PolicyStepsRegistry stepRegistry, ILMHistoryStore ilmHistoryStore, ClusterService clusterService,
ThreadPool threadPool, LongSupplier nowSupplier) {
this.stepRegistry = stepRegistry;
Expand Down Expand Up @@ -522,6 +539,8 @@ void registerFailedOperation(IndexMetadata indexMetadata, Exception failure) {
*/
private final Set<Tuple<Index, StepKey>> busyIndices = Collections.synchronizedSet(new HashSet<>());

static final ClusterStateTaskConfig ILM_TASK_CONFIG = ClusterStateTaskConfig.build(Priority.NORMAL);

/**
* Tracks already executing {@link IndexLifecycleClusterStateUpdateTask} tasks in {@link #executingTasks} to prevent queueing up
* duplicate cluster state updates.
Expand All @@ -541,7 +560,7 @@ private void submitUnlessAlreadyQueued(String source, IndexLifecycleClusterState
busyIndices.remove(dedupKey);
assert removed : "tried to unregister unknown task [" + task + "]";
}));
clusterService.submitStateUpdateTask(source, task);
clusterService.submitStateUpdateTask(source, task, ILM_TASK_CONFIG, ILM_TASK_EXECUTOR, task);
} else {
logger.trace("skipped redundant execution of [{}]", source);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public MoveToNextStepUpdateTask(Index index, String policy, Step.StepKey current
}

@Override
public ClusterState execute(ClusterState currentState) {
public ClusterState doExecute(ClusterState currentState) {
IndexMetadata indexMetadata = currentState.getMetadata().index(index);
if (indexMetadata == null) {
// Index must have been since deleted, ignore it
Expand All @@ -64,9 +64,7 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onClusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState.equals(newState) == false) {
stateChangeConsumer.accept(newState);
}
stateChangeConsumer.accept(newState);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ ToXContentObject getStepInfo() {
}

@Override
public ClusterState execute(ClusterState currentState) throws IOException {
protected ClusterState doExecute(ClusterState currentState) throws IOException {
IndexMetadata idxMeta = currentState.getMetadata().index(index);
if (idxMeta == null) {
// Index must have been since deleted, ignore it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,15 @@ private IndexMetadata setupIndexPolicy(String policyName) {
return indexMetadata;
}

public void testNeverExecuteNonClusterStateStep() throws IOException {
public void testNeverExecuteNonClusterStateStep() throws Exception {
setStateToKey(thirdStepKey);
Step startStep = policyStepsRegistry.getStep(indexMetadata, thirdStepKey);
long now = randomNonNegativeLong();
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now);
assertThat(task.execute(clusterState), sameInstance(clusterState));
}

public void testSuccessThenFailureUnsetNextKey() throws IOException {
public void testSuccessThenFailureUnsetNextKey() throws Exception {
secondStep.setWillComplete(false);
setStateToKey(firstStepKey);
Step startStep = policyStepsRegistry.getStep(indexMetadata, firstStepKey);
Expand All @@ -169,7 +169,7 @@ public void testSuccessThenFailureUnsetNextKey() throws IOException {
assertThat(lifecycleState.getStepInfo(), nullValue());
}

public void testExecuteUntilFirstNonClusterStateStep() throws IOException {
public void testExecuteUntilFirstNonClusterStateStep() throws Exception {
setStateToKey(secondStepKey);
Step startStep = policyStepsRegistry.getStep(indexMetadata, secondStepKey);
long now = randomNonNegativeLong();
Expand All @@ -185,7 +185,7 @@ public void testExecuteUntilFirstNonClusterStateStep() throws IOException {
assertThat(lifecycleState.getStepInfo(), nullValue());
}

public void testExecuteInvalidStartStep() throws IOException {
public void testExecuteInvalidStartStep() throws Exception {
// Unset the index's phase/action/step to simulate starting from scratch
LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder(
LifecycleExecutionState.fromIndexMetadata(clusterState.getMetadata().index(index)));
Expand All @@ -207,7 +207,7 @@ public void testExecuteInvalidStartStep() throws IOException {
assertSame(newState, clusterState);
}

public void testExecuteIncompleteWaitStepNoInfo() throws IOException {
public void testExecuteIncompleteWaitStepNoInfo() throws Exception {
secondStep.setWillComplete(false);
setStateToKey(secondStepKey);
Step startStep = policyStepsRegistry.getStep(indexMetadata, secondStepKey);
Expand All @@ -224,7 +224,7 @@ public void testExecuteIncompleteWaitStepNoInfo() throws IOException {
assertThat(lifecycleState.getStepInfo(), nullValue());
}

public void testExecuteIncompleteWaitStepWithInfo() throws IOException {
public void testExecuteIncompleteWaitStepWithInfo() throws Exception {
secondStep.setWillComplete(false);
RandomStepInfo stepInfo = new RandomStepInfo(() -> randomAlphaOfLength(10));
secondStep.expectedInfo(stepInfo);
Expand Down Expand Up @@ -252,7 +252,7 @@ public void testOnFailure() throws IOException {
task.onFailure(randomAlphaOfLength(10), expectedException);
}

public void testClusterActionStepThrowsException() throws IOException {
public void testClusterActionStepThrowsException() throws Exception {
RuntimeException thrownException = new RuntimeException("error");
firstStep.setException(thrownException);
setStateToKey(firstStepKey);
Expand All @@ -272,7 +272,7 @@ public void testClusterActionStepThrowsException() throws IOException {
containsString("{\"type\":\"runtime_exception\",\"reason\":\"error\",\"stack_trace\":\""));
}

public void testClusterWaitStepThrowsException() throws IOException {
public void testClusterWaitStepThrowsException() throws Exception {
RuntimeException thrownException = new RuntimeException("error");
secondStep.setException(thrownException);
setStateToKey(firstStepKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -164,7 +166,7 @@ public void testRunPolicyPhaseCompleteWithMoreStepsPolicyStep() {
runner.runPolicyAfterStateChange(policyName, indexMetadata);
runner.runPeriodicStep(policyName, Metadata.builder().put(indexMetadata, true).build(), indexMetadata);

Mockito.verify(clusterService, times(1)).submitStateUpdateTask(any(), any());
Mockito.verify(clusterService, times(1)).submitStateUpdateTask(anyString(), any(), any(), any(), any());
}

public void testRunPolicyErrorStep() {
Expand Down Expand Up @@ -626,10 +628,15 @@ public void testRunPolicyClusterStateActionStep() {

runner.runPolicyAfterStateChange(policyName, indexMetadata);

final ExecuteStepsUpdateTaskMatcher taskMatcher =
new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step);
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(
Mockito.eq("ilm-execute-cluster-state-steps [{\"phase\":\"phase\",\"action\":\"action\"," +
"\"name\":\"cluster_state_action_step\"} => null]"),
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step))
Mockito.argThat(taskMatcher),
eq(IndexLifecycleRunner.ILM_TASK_CONFIG),
any(),
Mockito.argThat(taskMatcher)
);
Mockito.verifyNoMoreInteractions(clusterService);
}
Expand All @@ -646,10 +653,15 @@ public void testRunPolicyClusterStateWaitStep() {

runner.runPolicyAfterStateChange(policyName, indexMetadata);

final ExecuteStepsUpdateTaskMatcher taskMatcher =
new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step);
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(
Mockito.eq("ilm-execute-cluster-state-steps [{\"phase\":\"phase\",\"action\":\"action\"," +
"\"name\":\"cluster_state_action_step\"} => null]"),
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step))
Mockito.argThat(taskMatcher),
eq(IndexLifecycleRunner.ILM_TASK_CONFIG),
any(),
Mockito.argThat(taskMatcher)
);
Mockito.verifyNoMoreInteractions(clusterService);
}
Expand Down Expand Up @@ -699,16 +711,20 @@ public void testRunPolicyThatDoesntExist() {
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
// verify that no exception is thrown
runner.runPolicyAfterStateChange(policyName, indexMetadata);
final SetStepInfoUpdateTaskMatcher taskMatcher = new SetStepInfoUpdateTaskMatcher(indexMetadata.getIndex(), policyName, null,
(builder, params) -> {
builder.startObject();
builder.field("reason", "policy [does_not_exist] does not exist");
builder.field("type", "illegal_argument_exception");
builder.endObject();
return builder;
});
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(
Mockito.eq("ilm-set-step-info {policy [cluster_state_action_policy], index [my_index], currentStep [null]}"),
Mockito.argThat(new SetStepInfoUpdateTaskMatcher(indexMetadata.getIndex(), policyName, null,
(builder, params) -> {
builder.startObject();
builder.field("reason", "policy [does_not_exist] does not exist");
builder.field("type", "illegal_argument_exception");
builder.endObject();
return builder;
}))
Mockito.argThat(taskMatcher),
eq(IndexLifecycleRunner.ILM_TASK_CONFIG),
any(),
Mockito.argThat(taskMatcher)
);
Mockito.verifyNoMoreInteractions(clusterService);
}
Expand Down
Loading