Skip to content

Commit f4ca615

Browse files
committed
Dedicated API for unbatched master tasks
Today when submitting a cluster state update task the caller can indicate that it is not to be included in a batch by using an executor obtained from `ClusterStateTaskExecutor#unbatched()`. This is kind of weird: the caller must not re-use the executor, and also the master service has no practical way to handle this special case any differently. This commit introduces a dedicated API for unbatched tasks, opening the door to some future simplifications. Extracted from elastic#85751
1 parent 4ba9ce7 commit f4ca615

File tree

125 files changed

+1093
-1274
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

125 files changed

+1093
-1274
lines changed

build-tools-internal/src/main/resources/forbidden/es-server-signatures.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,5 @@ org.apache.logging.log4j.LogManager#getLogger()
139139
java.lang.String#formatted(java.lang.Object[]) @ Uses default locale - use String#format(Locale, String, Object...) instead
140140

141141
@defaultMessage Unbatched cluster state tasks are a source of performance and stability bugs. Implement the update logic in a executor which is reused across tasks instead.
142-
org.elasticsearch.cluster.ClusterStateTaskExecutor#unbatched()
142+
org.elasticsearch.cluster.service.MasterService#submitUnbatchedStateUpdateTask(java.lang.String, org.elasticsearch.cluster.ClusterStateUpdateTask)
143+
org.elasticsearch.cluster.service.ClusterService#submitUnbatchedStateUpdateTask(java.lang.String, org.elasticsearch.cluster.ClusterStateUpdateTask)

modules/data-streams/src/main/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeService.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.apache.logging.log4j.message.ParameterizedMessage;
1313
import org.elasticsearch.cluster.ClusterState;
14-
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
1514
import org.elasticsearch.cluster.ClusterStateUpdateTask;
1615
import org.elasticsearch.cluster.LocalNodeMasterListener;
1716
import org.elasticsearch.cluster.metadata.DataStream;
@@ -60,7 +59,7 @@ public class UpdateTimeSeriesRangeService extends AbstractLifecycleComponent imp
6059
void perform(Runnable onComplete) {
6160
if (running.compareAndSet(false, true)) {
6261
LOGGER.debug("starting tsdb update task");
63-
clusterService.submitStateUpdateTask("update_tsdb_data_stream_end_times", new ClusterStateUpdateTask(Priority.URGENT) {
62+
submitUnbatchedTask("update_tsdb_data_stream_end_times", new ClusterStateUpdateTask(Priority.URGENT) {
6463
@Override
6564
public ClusterState execute(ClusterState currentState) throws Exception {
6665
return updateTimeSeriesTemporalRange(currentState, Instant.now());
@@ -79,15 +78,15 @@ public void onFailure(Exception e) {
7978
onComplete.run();
8079
}
8180

82-
}, newExecutor());
81+
});
8382
} else {
8483
LOGGER.debug("not starting tsdb update task, because another execution is still running");
8584
}
8685
}
8786

8887
@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
89-
private static <T extends ClusterStateUpdateTask> ClusterStateTaskExecutor<T> newExecutor() {
90-
return ClusterStateTaskExecutor.unbatched();
88+
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
89+
clusterService.submitUnbatchedStateUpdateTask(source, task);
9190
}
9291

9392
void setPollInterval(TimeValue newValue) {

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DeleteDataStreamTransportAction.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1717
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
1818
import org.elasticsearch.cluster.ClusterState;
19-
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
2019
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2120
import org.elasticsearch.cluster.block.ClusterBlockException;
2221
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -89,7 +88,7 @@ protected void masterOperation(
8988
systemIndices.validateDataStreamAccess(name, threadPool.getThreadContext());
9089
}
9190

92-
clusterService.submitStateUpdateTask(
91+
submitUnbatchedTask(
9392
"remove-data-stream [" + Strings.arrayToCommaDelimitedString(request.getNames()) + "]",
9493
new ClusterStateUpdateTask(Priority.HIGH, request.masterNodeTimeout()) {
9594

@@ -113,14 +112,13 @@ public ClusterState execute(ClusterState currentState) {
113112
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
114113
listener.onResponse(AcknowledgedResponse.TRUE);
115114
}
116-
},
117-
newExecutor()
115+
}
118116
);
119117
}
120118

121119
@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
122-
private static <T extends ClusterStateUpdateTask> ClusterStateTaskExecutor<T> newExecutor() {
123-
return ClusterStateTaskExecutor.unbatched();
120+
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
121+
clusterService.submitUnbatchedStateUpdateTask(source, task);
124122
}
125123

126124
static ClusterState removeDataStream(

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/PromoteDataStreamTransportAction.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1515
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
1616
import org.elasticsearch.cluster.ClusterState;
17-
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
1817
import org.elasticsearch.cluster.ClusterStateUpdateTask;
1918
import org.elasticsearch.cluster.block.ClusterBlockException;
2019
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -64,7 +63,7 @@ protected void masterOperation(
6463
ActionListener<AcknowledgedResponse> listener
6564
) throws Exception {
6665
systemIndices.validateDataStreamAccess(request.getName(), threadPool.getThreadContext());
67-
clusterService.submitStateUpdateTask(
66+
submitUnbatchedTask(
6867
"promote-data-stream [" + request.getName() + "]",
6968
new ClusterStateUpdateTask(Priority.HIGH, request.masterNodeTimeout()) {
7069

@@ -82,14 +81,13 @@ public ClusterState execute(ClusterState currentState) {
8281
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
8382
listener.onResponse(AcknowledgedResponse.TRUE);
8483
}
85-
},
86-
newExecutor()
84+
}
8785
);
8886
}
8987

9088
@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
91-
private static <T extends ClusterStateUpdateTask> ClusterStateTaskExecutor<T> newExecutor() {
92-
return ClusterStateTaskExecutor.unbatched();
89+
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
90+
clusterService.submitUnbatchedStateUpdateTask(source, task);
9391
}
9492

9593
static ClusterState promoteDataStream(ClusterState currentState, PromoteDataStreamAction.Request request) {

modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
2020
import org.elasticsearch.action.support.ActiveShardCount;
2121
import org.elasticsearch.cluster.ClusterState;
22-
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
2322
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2423
import org.elasticsearch.cluster.metadata.IndexMetadata;
2524
import org.elasticsearch.cluster.metadata.Metadata;
@@ -262,7 +261,7 @@ public void testMigrationWillRunAfterError() throws Exception {
262261
SetOnce<Exception> failure = new SetOnce<>();
263262
CountDownLatch clusterStateUpdated = new CountDownLatch(1);
264263
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
265-
.submitStateUpdateTask(this.getTestName(), new ClusterStateUpdateTask() {
264+
.submitUnbatchedStateUpdateTask(this.getTestName(), new ClusterStateUpdateTask() {
266265
@Override
267266
public ClusterState execute(ClusterState currentState) throws Exception {
268267
FeatureMigrationResults newResults = new FeatureMigrationResults(
@@ -287,7 +286,7 @@ public void onFailure(Exception e) {
287286
failure.set(e);
288287
clusterStateUpdated.countDown();
289288
}
290-
}, ClusterStateTaskExecutor.unbatched());
289+
});
291290

292291
clusterStateUpdated.await(10, TimeUnit.SECONDS); // Should be basically instantaneous
293292
if (failure.get() != null) {

qa/smoke-test-http/src/test/java/org/elasticsearch/http/ClusterStateRestCancellationIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.client.Request;
1717
import org.elasticsearch.client.Response;
1818
import org.elasticsearch.cluster.ClusterState;
19-
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
2019
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2120
import org.elasticsearch.cluster.SimpleDiffable;
2221
import org.elasticsearch.cluster.service.ClusterService;
@@ -45,7 +44,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
4544

4645
private void updateClusterState(ClusterService clusterService, UnaryOperator<ClusterState> updateOperator) {
4746
final PlainActionFuture<Void> future = new PlainActionFuture<>();
48-
clusterService.submitStateUpdateTask("update state", new ClusterStateUpdateTask() {
47+
clusterService.submitUnbatchedStateUpdateTask("update state", new ClusterStateUpdateTask() {
4948
@Override
5049
public ClusterState execute(ClusterState currentState) {
5150
return updateOperator.apply(currentState);
@@ -60,7 +59,7 @@ public void onFailure(Exception e) {
6059
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
6160
future.onResponse(null);
6261
}
63-
}, ClusterStateTaskExecutor.unbatched());
62+
});
6463
future.actionGet();
6564
}
6665

qa/smoke-test-http/src/test/java/org/elasticsearch/http/RestGetMappingsCancellationIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.elasticsearch.client.Response;
1818
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
1919
import org.elasticsearch.cluster.ClusterState;
20-
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
2120
import org.elasticsearch.cluster.ack.AckedRequest;
2221
import org.elasticsearch.cluster.block.ClusterBlock;
2322
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -98,12 +97,12 @@ public TimeValue masterNodeTimeout() {
9897

9998
PlainActionFuture<AcknowledgedResponse> future = PlainActionFuture.newFuture();
10099
internalCluster().getAnyMasterNodeInstance(ClusterService.class)
101-
.submitStateUpdateTask("get_mappings_cancellation_test", new AckedClusterStateUpdateTask(ackedRequest, future) {
100+
.submitUnbatchedStateUpdateTask("get_mappings_cancellation_test", new AckedClusterStateUpdateTask(ackedRequest, future) {
102101
@Override
103102
public ClusterState execute(ClusterState currentState) throws Exception {
104103
return transformationFn.apply(currentState);
105104
}
106-
}, ClusterStateTaskExecutor.unbatched());
105+
});
107106

108107
future.actionGet();
109108
}

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportDesiredNodesActionsIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.action.ActionFuture;
1414
import org.elasticsearch.action.ActionResponse;
1515
import org.elasticsearch.cluster.ClusterState;
16-
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
1716
import org.elasticsearch.cluster.ClusterStateUpdateTask;
1817
import org.elasticsearch.cluster.desirednodes.VersionConflictException;
1918
import org.elasticsearch.cluster.metadata.DesiredNodes;
@@ -373,7 +372,7 @@ private Runnable blockClusterStateUpdateThread() throws InterruptedException {
373372
final CountDownLatch unblockClusterStateUpdateTask = new CountDownLatch(1);
374373
final CountDownLatch blockingClusterStateUpdateTaskExecuting = new CountDownLatch(1);
375374
final ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
376-
clusterService.submitStateUpdateTask("blocking-task", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
375+
clusterService.submitUnbatchedStateUpdateTask("blocking-task", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
377376
@Override
378377
public ClusterState execute(ClusterState currentState) throws Exception {
379378
blockingClusterStateUpdateTaskExecuting.countDown();
@@ -386,7 +385,7 @@ public void onFailure(Exception e) {
386385
blockingClusterStateUpdateTaskExecuting.countDown();
387386
assert false : e.getMessage();
388387
}
389-
}, ClusterStateTaskExecutor.unbatched());
388+
});
390389

391390
assertTrue(blockingClusterStateUpdateTaskExecuting.await(10, TimeUnit.SECONDS));
392391
return unblockClusterStateUpdateTask::countDown;

server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterHealthIT.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ public void testWaitForEventsRetriesIfOtherConditionsNotMet() {
286286
final AtomicBoolean keepSubmittingTasks = new AtomicBoolean(true);
287287
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
288288
final PlainActionFuture<Void> completionFuture = new PlainActionFuture<>();
289-
clusterService.submitStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) {
289+
clusterService.submitUnbatchedStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) {
290290
@Override
291291
public ClusterState execute(ClusterState currentState) {
292292
return currentState;
@@ -301,12 +301,12 @@ public void onFailure(Exception e) {
301301
@Override
302302
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
303303
if (keepSubmittingTasks.get()) {
304-
clusterService.submitStateUpdateTask("looping task", this, ClusterStateTaskExecutor.unbatched());
304+
clusterService.submitUnbatchedStateUpdateTask("looping task", this);
305305
} else {
306306
completionFuture.onResponse(null);
307307
}
308308
}
309-
}, ClusterStateTaskExecutor.unbatched());
309+
});
310310

311311
try {
312312
createIndex("index");
@@ -377,7 +377,7 @@ public void testWaitForEventsTimesOutIfMasterBusy() {
377377
final AtomicBoolean keepSubmittingTasks = new AtomicBoolean(true);
378378
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
379379
final PlainActionFuture<Void> completionFuture = new PlainActionFuture<>();
380-
clusterService.submitStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) {
380+
clusterService.submitUnbatchedStateUpdateTask("looping task", new ClusterStateUpdateTask(Priority.LOW) {
381381
@Override
382382
public ClusterState execute(ClusterState currentState) {
383383
return currentState;
@@ -392,12 +392,12 @@ public void onFailure(Exception e) {
392392
@Override
393393
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
394394
if (keepSubmittingTasks.get()) {
395-
clusterService.submitStateUpdateTask("looping task", this, ClusterStateTaskExecutor.unbatched());
395+
clusterService.submitUnbatchedStateUpdateTask("looping task", this);
396396
} else {
397397
completionFuture.onResponse(null);
398398
}
399399
}
400-
}, ClusterStateTaskExecutor.unbatched());
400+
});
401401

402402
try {
403403
final ClusterHealthResponse clusterHealthResponse = client().admin()

server/src/internalClusterTest/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ public void testCannotCommitStateThreeNodes() throws Exception {
333333
final AtomicReference<Exception> failure = new AtomicReference<>();
334334
logger.debug("--> submitting for cluster state to be rejected");
335335
final ClusterService masterClusterService = internalCluster().clusterService(master);
336-
masterClusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
336+
masterClusterService.submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask() {
337337
@Override
338338
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
339339
latch.countDown();
@@ -355,7 +355,7 @@ public void onFailure(Exception e) {
355355
failure.set(e);
356356
latch.countDown();
357357
}
358-
}, ClusterStateTaskExecutor.unbatched());
358+
});
359359

360360
logger.debug("--> waiting for cluster state to be processed/rejected");
361361
latch.await();

0 commit comments

Comments
 (0)