Skip to content

Commit 7e6c828

Browse files
original-brownbearprobakowski
authored andcommitted
Add new Threadpool for Cluster Coordination Activities (elastic#83576)
Using the generic pool for these all the activities surrounding cluster coordination seems broken. They all get blocked on the same mutex in the `Coordinator` eventually and run effectively sequentially. Particularly in clusters with larger node counts this could lead to situations where lots of generic threads are needlessly spun up only for the purpose of waiting on the mutex. Since we at times also lock on the mutex in the coordinator on transport threads, it is particularly unforuntate when there's lots of generic threads waiting on it. => fix this by using a single threaded pool for coordination work.
1 parent ea95ac9 commit 7e6c828

File tree

10 files changed

+30
-22
lines changed

10 files changed

+30
-22
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private boolean isActive() {
8484
}
8585

8686
void scheduleNextWarning() {
87-
threadPool.scheduleUnlessShuttingDown(clusterFormationWarningTimeout, Names.GENERIC, new AbstractRunnable() {
87+
threadPool.scheduleUnlessShuttingDown(clusterFormationWarningTimeout, Names.CLUSTER_COORDINATION, new AbstractRunnable() {
8888
@Override
8989
public void onFailure(Exception e) {
9090
logger.debug("unexpected exception scheduling cluster formation warning", e);

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ private void abdicateTo(DiscoveryNode newMaster) {
497497
final StartJoinRequest startJoinRequest = new StartJoinRequest(newMaster, Math.max(getCurrentTerm(), maxTermSeen) + 1);
498498
logger.info("abdicating to {} with term {}", newMaster, startJoinRequest.getTerm());
499499
getLastAcceptedState().nodes().mastersFirstStream().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node));
500-
// handling of start join messages on the local node will be dispatched to the generic thread-pool
500+
// handling of start join messages on the local node will be dispatched to the coordination thread-pool
501501
assert mode == Mode.LEADER : "should still be leader after sending abdication messages " + mode;
502502
// explicitly move node to candidate state so that the next cluster state update task yields an onNoLongerMaster event
503503
becomeCandidate("after abdicating to " + newMaster);
@@ -635,7 +635,7 @@ private void sendJoinValidate(DiscoveryNode discoveryNode, ClusterState clusterS
635635
new ActionListenerResponseHandler<>(listener.delegateResponse((l, e) -> {
636636
logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", discoveryNode), e);
637637
listener.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
638-
}), i -> Empty.INSTANCE, Names.GENERIC)
638+
}), i -> Empty.INSTANCE, Names.CLUSTER_COORDINATION)
639639
);
640640
}
641641

@@ -651,7 +651,7 @@ private void sendJoinPing(DiscoveryNode discoveryNode, TransportRequestOptions.T
651651
e
652652
);
653653
listener.onFailure(new IllegalStateException("failure when sending a join ping request to node", e));
654-
}), i -> Empty.INSTANCE, Names.GENERIC)
654+
}), i -> Empty.INSTANCE, Names.CLUSTER_COORDINATION)
655655
);
656656
}
657657

@@ -1583,7 +1583,7 @@ public void run() {
15831583
public String toString() {
15841584
return "scheduled timeout for " + CoordinatorPublication.this;
15851585
}
1586-
}, publishTimeout, Names.GENERIC);
1586+
}, publishTimeout, Names.CLUSTER_COORDINATION);
15871587

15881588
this.infoTimeoutHandler = transportService.getThreadPool().schedule(new Runnable() {
15891589
@Override
@@ -1597,7 +1597,7 @@ public void run() {
15971597
public String toString() {
15981598
return "scheduled timeout for reporting on " + CoordinatorPublication.this;
15991599
}
1600-
}, publishInfoTimeout, Names.GENERIC);
1600+
}, publishInfoTimeout, Names.CLUSTER_COORDINATION);
16011601
}
16021602

16031603
private void removePublicationAndPossiblyBecomeCandidate(String reason) {

server/src/main/java/org/elasticsearch/cluster/coordination/ElectionSchedulerFactory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,12 @@ void scheduleNextElection(final TimeValue gracePeriod, final Runnable scheduledR
172172
final long maxDelayMillis = Math.min(maxTimeout.millis(), initialTimeout.millis() + thisAttempt * backoffTime.millis());
173173
final long delayMillis = toPositiveLongAtMost(random.nextLong(), maxDelayMillis) + gracePeriod.millis();
174174
final Runnable runnable = new AbstractRunnable() {
175+
176+
@Override
177+
public void onRejection(Exception e) {
178+
logger.debug("threadpool was shut down", e);
179+
}
180+
175181
@Override
176182
public void onFailure(Exception e) {
177183
logger.debug(new ParameterizedMessage("unexpected exception in wakeup of {}", this), e);
@@ -206,7 +212,7 @@ public String toString() {
206212
};
207213

208214
logger.debug("scheduling {}", runnable);
209-
threadPool.scheduleUnlessShuttingDown(TimeValue.timeValueMillis(delayMillis), Names.GENERIC, runnable);
215+
threadPool.scheduleUnlessShuttingDown(TimeValue.timeValueMillis(delayMillis), Names.CLUSTER_COORDINATION, runnable);
210216
}
211217

212218
@Override

server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ private void handleFollowerCheck(FollowerCheckRequest request, TransportChannel
193193
throw new CoordinationStateRejectedException("rejecting " + request + " since local state is " + this);
194194
}
195195

196-
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
196+
transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION).execute(new AbstractRunnable() {
197197
@Override
198198
protected void doRun() throws IOException {
199199
logger.trace("responding to {} on slow path", request);
@@ -358,7 +358,7 @@ public void handleException(TransportException exp) {
358358
}
359359

360360
void failNode(String reason) {
361-
transportService.getThreadPool().generic().execute(new Runnable() {
361+
transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION).execute(new Runnable() {
362362
@Override
363363
public void run() {
364364
synchronized (mutex) {

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public ClusterTasksResult<JoinTask> execute(ClusterState currentState, List<Join
129129

130130
transportService.registerRequestHandler(
131131
JOIN_ACTION_NAME,
132-
ThreadPool.Names.GENERIC,
132+
Names.CLUSTER_COORDINATION,
133133
false,
134134
false,
135135
JoinRequest::new,
@@ -141,7 +141,7 @@ public ClusterTasksResult<JoinTask> execute(ClusterState currentState, List<Join
141141

142142
transportService.registerRequestHandler(
143143
START_JOIN_ACTION_NAME,
144-
Names.GENERIC,
144+
Names.CLUSTER_COORDINATION,
145145
false,
146146
false,
147147
StartJoinRequest::new,
@@ -164,7 +164,7 @@ public ClusterTasksResult<JoinTask> execute(ClusterState currentState, List<Join
164164
final List<String> dataPaths = Environment.PATH_DATA_SETTING.get(settings);
165165
transportService.registerRequestHandler(
166166
JOIN_VALIDATE_ACTION_NAME,
167-
ThreadPool.Names.GENERIC,
167+
ThreadPool.Names.CLUSTER_COORDINATION,
168168
ValidateJoinRequest::new,
169169
(request, channel, task) -> {
170170
final ClusterState localState = currentStateSupplier.get();

server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public void startLagDetector(final long version) {
105105
} else {
106106
logger.debug("starting lag detector for version {}: {}", version, laggingTrackers);
107107

108-
threadPool.scheduleUnlessShuttingDown(clusterStateApplicationTimeout, Names.GENERIC, new Runnable() {
108+
threadPool.scheduleUnlessShuttingDown(clusterStateApplicationTimeout, Names.CLUSTER_COORDINATION, new Runnable() {
109109
@Override
110110
public void run() {
111111
laggingTrackers.forEach(t -> t.checkForLag(version));

server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ public void handleException(TransportException exp) {
331331

332332
void leaderFailed(MessageSupplier messageSupplier, Exception e) {
333333
if (isClosed.compareAndSet(false, true)) {
334-
transportService.getThreadPool().generic().execute(new Runnable() {
334+
transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION).execute(new Runnable() {
335335
@Override
336336
public void run() {
337337
leaderFailureListener.onLeaderFailure(messageSupplier, e);
@@ -423,7 +423,7 @@ interface LeaderFailureListener {
423423
/**
424424
* Called when a leader failure is detected. Checking the leader health is somewhat asynchronous, so this method may report a leader
425425
* failure after the node has already decided there's no known leader for some other reason. This method is called on the {@code
426-
* GENERIC} thread pool.
426+
* COORDINATION} thread pool.
427427
*
428428
* @param messageSupplier The message to log if prior to this failure there was a known master in the cluster.
429429
* @param exception An exception that gives more detail of the leader failure.

server/src/main/java/org/elasticsearch/cluster/coordination/PreVoteCollector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public class PreVoteCollector {
6363

6464
transportService.registerRequestHandler(
6565
REQUEST_PRE_VOTE_ACTION_NAME,
66-
Names.GENERIC,
66+
Names.CLUSTER_COORDINATION,
6767
false,
6868
false,
6969
PreVoteRequest::new,
@@ -174,7 +174,7 @@ public void handleException(TransportException exp) {
174174

175175
@Override
176176
public String executor() {
177-
return Names.GENERIC;
177+
return Names.CLUSTER_COORDINATION;
178178
}
179179

180180
@Override

server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public PublicationTransportHandler(
9696

9797
transportService.registerRequestHandler(
9898
PUBLISH_STATE_ACTION_NAME,
99-
ThreadPool.Names.GENERIC,
99+
ThreadPool.Names.CLUSTER_COORDINATION,
100100
false,
101101
false,
102102
BytesTransportRequest::new,
@@ -105,7 +105,7 @@ public PublicationTransportHandler(
105105

106106
transportService.registerRequestHandler(
107107
COMMIT_STATE_ACTION_NAME,
108-
ThreadPool.Names.GENERIC,
108+
ThreadPool.Names.CLUSTER_COORDINATION,
109109
false,
110110
false,
111111
ApplyCommitRequest::new,
@@ -383,7 +383,7 @@ public void sendApplyCommit(
383383
COMMIT_STATE_ACTION_NAME,
384384
applyCommitRequest,
385385
STATE_REQUEST_OPTIONS,
386-
new ActionListenerResponseHandler<>(listener, in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)
386+
new ActionListenerResponseHandler<>(listener, in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.CLUSTER_COORDINATION)
387387
);
388388
}
389389

@@ -456,10 +456,10 @@ private void sendClusterState(
456456
PUBLISH_STATE_ACTION_NAME,
457457
new BytesTransportRequest(bytes, destination.getVersion()),
458458
STATE_REQUEST_OPTIONS,
459-
new ActionListenerResponseHandler<PublishWithJoinResponse>(
459+
new ActionListenerResponseHandler<>(
460460
ActionListener.runAfter(listener, bytes::decRef),
461461
PublishWithJoinResponse::new,
462-
ThreadPool.Names.GENERIC
462+
ThreadPool.Names.CLUSTER_COORDINATION
463463
)
464464
);
465465
} catch (Exception e) {

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
5858
public static class Names {
5959
public static final String SAME = "same";
6060
public static final String GENERIC = "generic";
61+
public static final String CLUSTER_COORDINATION = "cluster_coordination";
6162
public static final String GET = "get";
6263
public static final String ANALYZE = "analyze";
6364
public static final String WRITE = "write";
@@ -219,6 +220,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
219220
new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5), false)
220221
);
221222
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false));
223+
builders.put(Names.CLUSTER_COORDINATION, new FixedExecutorBuilder(settings, Names.CLUSTER_COORDINATION, 1, -1, false));
222224
builders.put(
223225
Names.FETCH_SHARD_STORE,
224226
new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5), false)

0 commit comments

Comments
 (0)