Skip to content

Commit 388f754

Browse files
authored
Change step execution flow to be deliberate about type (#34126)
This commit changes the way that step execution flows. Rather than have any step run when the cluster state changes or the periodic scheduler fires, this now runs the different types of steps at different times. `AsyncWaitStep` is run at a periodic manner, ie, every 10 minutes by default `ClusterStateActionStep` and `ClusterStateWaitStep` are run every time the cluster state changes. `AsyncActionStep` is now run only after the cluster state has been transitioned into a new step. This prevents these non-idempotent steps from running at the same time. It addition to being run when transitioned into, this is also run when a node is newly elected master (only if set as the current step) so that master failover does not fail to run the step. This also changes the `RolloverStep` from an `AsyncActionStep` to an `AsyncWaitStep` so that it can run periodically. Relates to #29823
1 parent fb90770 commit 388f754

File tree

15 files changed

+630
-297
lines changed

15 files changed

+630
-297
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncWaitStep.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
package org.elasticsearch.xpack.core.indexlifecycle;
77

88
import org.elasticsearch.client.Client;
9+
import org.elasticsearch.cluster.metadata.IndexMetaData;
910
import org.elasticsearch.common.xcontent.ToXContentObject;
10-
import org.elasticsearch.index.Index;
1111

1212
public abstract class AsyncWaitStep extends Step {
1313

@@ -22,7 +22,7 @@ protected Client getClient() {
2222
return client;
2323
}
2424

25-
public abstract void evaluateCondition(Index index, Listener listener);
25+
public abstract void evaluateCondition(IndexMetaData indexMetaData, Listener listener);
2626

2727
public interface Listener {
2828

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,9 @@ public List<Step> toSteps(Client client) {
219219
steps.add(new InitializePolicyContextStep(InitializePolicyContextStep.KEY, lastStepKey));
220220

221221
Collections.reverse(steps);
222-
logger.debug("STEP COUNT: " + steps.size());
222+
logger.trace("STEP COUNT: " + steps.size());
223223
for (Step step : steps) {
224-
logger.debug(step.getKey() + " -> " + step.getNextStepKey());
224+
logger.trace(step.getKey() + " -> " + step.getNextStepKey());
225225
}
226226

227227
return steps;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,18 @@
88
import org.elasticsearch.action.ActionListener;
99
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
1010
import org.elasticsearch.client.Client;
11-
import org.elasticsearch.cluster.ClusterState;
1211
import org.elasticsearch.cluster.metadata.IndexMetaData;
1312
import org.elasticsearch.common.Strings;
1413
import org.elasticsearch.common.unit.ByteSizeValue;
1514
import org.elasticsearch.common.unit.TimeValue;
15+
import org.elasticsearch.common.xcontent.ToXContentObject;
16+
import org.elasticsearch.common.xcontent.XContentBuilder;
1617

18+
import java.io.IOException;
1719
import java.util.Locale;
1820
import java.util.Objects;
1921

20-
public class RolloverStep extends AsyncActionStep {
22+
public class RolloverStep extends AsyncWaitStep {
2123
public static final String NAME = "attempt_rollover";
2224

2325
private ByteSizeValue maxSize;
@@ -33,7 +35,7 @@ public RolloverStep(StepKey key, StepKey nextStepKey, Client client, ByteSizeVal
3335
}
3436

3537
@Override
36-
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
38+
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
3739
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetaData.getSettings());
3840

3941
if (Strings.isNullOrEmpty(rolloverAlias)) {
@@ -54,7 +56,7 @@ public void performAction(IndexMetaData indexMetaData, ClusterState currentState
5456
rolloverRequest.addMaxIndexDocsCondition(maxDocs);
5557
}
5658
getClient().admin().indices().rolloverIndex(rolloverRequest,
57-
ActionListener.wrap(response -> listener.onResponse(response.isRolledOver()), listener::onFailure));
59+
ActionListener.wrap(response -> listener.onResponse(response.isRolledOver(), new EmptyInfo()), listener::onFailure));
5860
}
5961

6062
ByteSizeValue getMaxSize() {
@@ -89,4 +91,13 @@ public boolean equals(Object obj) {
8991
Objects.equals(maxDocs, other.maxDocs);
9092
}
9193

94+
// We currently have no information to provide for this AsyncWaitStep, so this is an empty object
95+
private class EmptyInfo implements ToXContentObject {
96+
private EmptyInfo() {}
97+
98+
@Override
99+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
100+
return builder;
101+
}
102+
}
92103
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStep.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@
88
import org.elasticsearch.action.ActionListener;
99
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
1010
import org.elasticsearch.client.Client;
11+
import org.elasticsearch.cluster.metadata.IndexMetaData;
1112
import org.elasticsearch.common.ParseField;
1213
import org.elasticsearch.common.Strings;
1314
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
1415
import org.elasticsearch.common.xcontent.ToXContentObject;
1516
import org.elasticsearch.common.xcontent.XContentBuilder;
16-
import org.elasticsearch.index.Index;
1717

1818
import java.io.IOException;
1919
import java.util.Arrays;
@@ -38,12 +38,14 @@ public int getMaxNumSegments() {
3838
}
3939

4040
@Override
41-
public void evaluateCondition(Index index, Listener listener) {
42-
getClient().admin().indices().segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> {
43-
long numberShardsLeftToMerge = StreamSupport.stream(response.getIndices().get(index.getName()).spliterator(), false)
44-
.filter(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments)).count();
45-
listener.onResponse(numberShardsLeftToMerge == 0, new Info(numberShardsLeftToMerge));
46-
}, listener::onFailure));
41+
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
42+
getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetaData.getIndex().getName()),
43+
ActionListener.wrap(response -> {
44+
long numberShardsLeftToMerge =
45+
StreamSupport.stream(response.getIndices().get(indexMetaData.getIndex().getName()).spliterator(), false)
46+
.filter(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments)).count();
47+
listener.onResponse(numberShardsLeftToMerge == 0, new Info(numberShardsLeftToMerge));
48+
}, listener::onFailure));
4749
}
4850

4951
@Override

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStepTests.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.elasticsearch.common.unit.ByteSizeUnit;
2222
import org.elasticsearch.common.unit.ByteSizeValue;
2323
import org.elasticsearch.common.unit.TimeValue;
24-
import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep.Listener;
24+
import org.elasticsearch.common.xcontent.ToXContentObject;
2525
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
2626
import org.junit.Before;
2727
import org.mockito.Mockito;
@@ -148,10 +148,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
148148
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
149149

150150
SetOnce<Boolean> actionCompleted = new SetOnce<>();
151-
step.performAction(indexMetaData, null, new Listener() {
151+
step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {
152152

153153
@Override
154-
public void onResponse(boolean complete) {
154+
public void onResponse(boolean complete, ToXContentObject obj) {
155155
actionCompleted.set(complete);
156156
}
157157

@@ -205,10 +205,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
205205
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
206206

207207
SetOnce<Boolean> actionCompleted = new SetOnce<>();
208-
step.performAction(indexMetaData, null, new Listener() {
208+
step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {
209209

210210
@Override
211-
public void onResponse(boolean complete) {
211+
public void onResponse(boolean complete, ToXContentObject obj) {
212212
actionCompleted.set(complete);
213213
}
214214

@@ -263,10 +263,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
263263
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
264264

265265
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
266-
step.performAction(indexMetaData, null, new Listener() {
266+
step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {
267267

268268
@Override
269-
public void onResponse(boolean complete) {
269+
public void onResponse(boolean complete, ToXContentObject obj) {
270270
throw new AssertionError("Unexpected method call");
271271
}
272272

@@ -292,9 +292,9 @@ public void testPerformActionInvalidNullOrEmptyAlias() {
292292
RolloverStep step = createRandomInstance();
293293

294294
SetOnce<Exception> exceptionThrown = new SetOnce<>();
295-
step.performAction(indexMetaData, null, new Listener() {
295+
step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {
296296
@Override
297-
public void onResponse(boolean complete) {
297+
public void onResponse(boolean complete, ToXContentObject obj) {
298298
throw new AssertionError("Unexpected method call");
299299
}
300300

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStepTests.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.core.indexlifecycle;
77

88
import org.apache.lucene.util.SetOnce;
9+
import org.elasticsearch.Version;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
1112
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
@@ -14,6 +15,8 @@
1415
import org.elasticsearch.client.AdminClient;
1516
import org.elasticsearch.client.Client;
1617
import org.elasticsearch.client.IndicesAdminClient;
18+
import org.elasticsearch.cluster.metadata.IndexMetaData;
19+
import org.elasticsearch.common.settings.Settings;
1720
import org.elasticsearch.common.xcontent.ToXContentObject;
1821
import org.elasticsearch.index.Index;
1922
import org.elasticsearch.index.engine.Segment;
@@ -41,6 +44,15 @@ public SegmentCountStep createRandomInstance() {
4144
return new SegmentCountStep(stepKey, nextStepKey, null, maxNumSegments);
4245
}
4346

47+
private IndexMetaData makeMeta(Index index) {
48+
return IndexMetaData.builder(index.getName())
49+
.settings(Settings.builder()
50+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
51+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
52+
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
53+
.build();
54+
}
55+
4456
@Override
4557
public SegmentCountStep mutateInstance(SegmentCountStep instance) {
4658
StepKey key = instance.getKey();
@@ -109,7 +121,7 @@ public void testIsConditionMet() {
109121
SetOnce<ToXContentObject> conditionInfo = new SetOnce<>();
110122

111123
SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
112-
step.evaluateCondition(index, new AsyncWaitStep.Listener() {
124+
step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
113125
@Override
114126
public void onResponse(boolean conditionMet, ToXContentObject info) {
115127
conditionMetResult.set(conditionMet);
@@ -166,7 +178,7 @@ public void testIsConditionFails() {
166178
SetOnce<ToXContentObject> conditionInfo = new SetOnce<>();
167179

168180
SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
169-
step.evaluateCondition(index, new AsyncWaitStep.Listener() {
181+
step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
170182
@Override
171183
public void onResponse(boolean conditionMet, ToXContentObject info) {
172184
conditionMetResult.set(conditionMet);
@@ -206,7 +218,7 @@ public void testThrowsException() {
206218
SetOnce<Boolean> exceptionThrown = new SetOnce<>();
207219

208220
SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
209-
step.evaluateCondition(index, new AsyncWaitStep.Listener() {
221+
step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
210222
@Override
211223
public void onResponse(boolean conditionMet, ToXContentObject info) {
212224
throw new AssertionError("unexpected method call");

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
1919
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState;
2020
import org.elasticsearch.xpack.core.indexlifecycle.Step;
21+
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
2122

2223
import java.io.IOException;
2324
import java.util.function.LongSupplier;
@@ -28,15 +29,18 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
2829
private final Index index;
2930
private final Step startStep;
3031
private final PolicyStepsRegistry policyStepsRegistry;
32+
private final IndexLifecycleRunner lifecycleRunner;
3133
private LongSupplier nowSupplier;
34+
private Step.StepKey nextStepKey = null;
3235

3336
public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry,
34-
LongSupplier nowSupplier) {
37+
IndexLifecycleRunner lifecycleRunner, LongSupplier nowSupplier) {
3538
this.policy = policy;
3639
this.index = index;
3740
this.startStep = startStep;
3841
this.policyStepsRegistry = policyStepsRegistry;
3942
this.nowSupplier = nowSupplier;
43+
this.lifecycleRunner = lifecycleRunner;
4044
}
4145

4246
String getPolicy() {
@@ -63,7 +67,7 @@ Step getStartStep() {
6367
* @throws IOException if any exceptions occur
6468
*/
6569
@Override
66-
public ClusterState execute(ClusterState currentState) throws IOException {
70+
public ClusterState execute(final ClusterState currentState) throws IOException {
6771
Step currentStep = startStep;
6872
IndexMetaData indexMetaData = currentState.metaData().index(index);
6973
if (indexMetaData == null) {
@@ -74,22 +78,24 @@ public ClusterState execute(ClusterState currentState) throws IOException {
7478
Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy, indexMetaData,
7579
LifecycleExecutionState.fromIndexMetadata(indexMetaData));
7680
if (currentStep.equals(registeredCurrentStep)) {
81+
ClusterState state = currentState;
7782
// We can do cluster state steps all together until we
7883
// either get to a step that isn't a cluster state step or a
7984
// cluster state wait step returns not completed
8085
while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
86+
nextStepKey = currentStep.getNextStepKey();
8187
if (currentStep instanceof ClusterStateActionStep) {
8288
// cluster state action step so do the action and
83-
// move
84-
// the cluster state to the next step
89+
// move the cluster state to the next step
8590
logger.trace("[{}] performing cluster state action ({}) [{}], next: [{}]",
8691
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey());
87-
currentState = ((ClusterStateActionStep) currentStep).performAction(index, currentState);
92+
state = ((ClusterStateActionStep) currentStep).performAction(index, state);
8893
if (currentStep.getNextStepKey() == null) {
89-
return currentState;
90-
}
91-
currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getKey(),
94+
return state;
95+
} else {
96+
state = IndexLifecycleRunner.moveClusterStateToNextStep(index, state, currentStep.getKey(),
9297
currentStep.getNextStepKey(), nowSupplier);
98+
}
9399
} else {
94100
// cluster state wait step so evaluate the
95101
// condition, if the condition is met move to the
@@ -99,29 +105,34 @@ public ClusterState execute(ClusterState currentState) throws IOException {
99105
// condition again
100106
logger.trace("[{}] waiting for cluster state step condition ({}) [{}], next: [{}]",
101107
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey());
102-
ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, currentState);
108+
ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state);
103109
if (result.isComplete()) {
104110
if (currentStep.getNextStepKey() == null) {
105-
return currentState;
106-
}
107-
currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getKey(),
111+
return state;
112+
} else {
113+
state = IndexLifecycleRunner.moveClusterStateToNextStep(index, state, currentStep.getKey(),
108114
currentStep.getNextStepKey(), nowSupplier);
115+
}
109116
} else {
110-
logger.debug("condition not met, returning existing state");
117+
logger.debug("[{}] condition not met ({}), returning existing state", index.getName(), currentStep.getKey());
111118
ToXContentObject stepInfo = result.getInfomationContext();
112119
if (stepInfo == null) {
113-
return currentState;
120+
return state;
114121
} else {
115-
return IndexLifecycleRunner.addStepInfoToClusterState(index, currentState, stepInfo);
122+
return IndexLifecycleRunner.addStepInfoToClusterState(index, state, stepInfo);
116123
}
117124
}
118125
}
126+
// There are actions we need to take in the event a phase
127+
// transition happens, so even if we would continue in the while
128+
// loop, if we are about to go into a new phase, return so that
129+
// other processing can occur
119130
if (currentStep.getKey().getPhase().equals(currentStep.getNextStepKey().getPhase()) == false) {
120-
return currentState;
131+
return state;
121132
}
122133
currentStep = policyStepsRegistry.getStep(indexMetaData, currentStep.getNextStepKey());
123134
}
124-
return currentState;
135+
return state;
125136
} else {
126137
// either we are no longer the master or the step is now
127138
// not the same as when we submitted the update task. In
@@ -130,6 +141,19 @@ public ClusterState execute(ClusterState currentState) throws IOException {
130141
}
131142
}
132143

144+
@Override
145+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
146+
if (oldState.equals(newState) == false) {
147+
IndexMetaData indexMetaData = newState.metaData().index(index);
148+
if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY && indexMetaData != null) {
149+
// After the cluster state has been processed and we have moved
150+
// to a new step, we need to conditionally execute the step iff
151+
// it is an `AsyncAction` so that it is executed exactly once.
152+
lifecycleRunner.maybeRunAsyncAction(newState, indexMetaData, policy, nextStepKey);
153+
}
154+
}
155+
}
156+
133157
@Override
134158
public void onFailure(String source, Exception e) {
135159
throw new ElasticsearchException(

0 commit comments

Comments
 (0)