Skip to content

Add new Threadpool for Cluster Coordination Activities #83576

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private boolean isActive() {
}

void scheduleNextWarning() {
threadPool.scheduleUnlessShuttingDown(clusterFormationWarningTimeout, Names.GENERIC, new AbstractRunnable() {
threadPool.scheduleUnlessShuttingDown(clusterFormationWarningTimeout, Names.CLUSTER_COORDINATION, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.debug("unexpected exception scheduling cluster formation warning", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ private void abdicateTo(DiscoveryNode newMaster) {
final StartJoinRequest startJoinRequest = new StartJoinRequest(newMaster, Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.info("abdicating to {} with term {}", newMaster, startJoinRequest.getTerm());
getLastAcceptedState().nodes().mastersFirstStream().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node));
// handling of start join messages on the local node will be dispatched to the generic thread-pool
// handling of start join messages on the local node will be dispatched to the coordination thread-pool
assert mode == Mode.LEADER : "should still be leader after sending abdication messages " + mode;
// explicitly move node to candidate state so that the next cluster state update task yields an onNoLongerMaster event
becomeCandidate("after abdicating to " + newMaster);
Expand Down Expand Up @@ -635,7 +635,7 @@ private void sendJoinValidate(DiscoveryNode discoveryNode, ClusterState clusterS
new ActionListenerResponseHandler<>(listener.delegateResponse((l, e) -> {
logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", discoveryNode), e);
listener.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
}), i -> Empty.INSTANCE, Names.GENERIC)
}), i -> Empty.INSTANCE, Names.CLUSTER_COORDINATION)
);
}

Expand All @@ -651,7 +651,7 @@ private void sendJoinPing(DiscoveryNode discoveryNode, TransportRequestOptions.T
e
);
listener.onFailure(new IllegalStateException("failure when sending a join ping request to node", e));
}), i -> Empty.INSTANCE, Names.GENERIC)
}), i -> Empty.INSTANCE, Names.CLUSTER_COORDINATION)
);
}

Expand Down Expand Up @@ -1583,7 +1583,7 @@ public void run() {
public String toString() {
return "scheduled timeout for " + CoordinatorPublication.this;
}
}, publishTimeout, Names.GENERIC);
}, publishTimeout, Names.CLUSTER_COORDINATION);

this.infoTimeoutHandler = transportService.getThreadPool().schedule(new Runnable() {
@Override
Expand All @@ -1597,7 +1597,7 @@ public void run() {
public String toString() {
return "scheduled timeout for reporting on " + CoordinatorPublication.this;
}
}, publishInfoTimeout, Names.GENERIC);
}, publishInfoTimeout, Names.CLUSTER_COORDINATION);
}

private void removePublicationAndPossiblyBecomeCandidate(String reason) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ void scheduleNextElection(final TimeValue gracePeriod, final Runnable scheduledR
final long maxDelayMillis = Math.min(maxTimeout.millis(), initialTimeout.millis() + thisAttempt * backoffTime.millis());
final long delayMillis = toPositiveLongAtMost(random.nextLong(), maxDelayMillis) + gracePeriod.millis();
final Runnable runnable = new AbstractRunnable() {

@Override
public void onRejection(Exception e) {
logger.debug("threadpool was shut down", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I follow this, is that not identical to the default ThreadedRunnable behavior (used by schedule), except not checking that the threadpool was indeed shutdown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea the problem was that inside ThreadedRunnable.run we just call:

                executor.execute(runnable);

If this is called with an AbstractRunnable then we will call onRejection on it instead of throwing the rejection and the handling in ThreadedRunnable doesn't come into play => we need to manually catch this case here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, thanks, I have been here before 🙂

}

@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("unexpected exception in wakeup of {}", this), e);
Expand Down Expand Up @@ -206,7 +212,7 @@ public String toString() {
};

logger.debug("scheduling {}", runnable);
threadPool.scheduleUnlessShuttingDown(TimeValue.timeValueMillis(delayMillis), Names.GENERIC, runnable);
threadPool.scheduleUnlessShuttingDown(TimeValue.timeValueMillis(delayMillis), Names.CLUSTER_COORDINATION, runnable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private void handleFollowerCheck(FollowerCheckRequest request, TransportChannel
throw new CoordinationStateRejectedException("rejecting " + request + " since local state is " + this);
}

transportService.getThreadPool().generic().execute(new AbstractRunnable() {
transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION).execute(new AbstractRunnable() {
@Override
protected void doRun() throws IOException {
logger.trace("responding to {} on slow path", request);
Expand Down Expand Up @@ -358,7 +358,7 @@ public void handleException(TransportException exp) {
}

void failNode(String reason) {
transportService.getThreadPool().generic().execute(new Runnable() {
transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION).execute(new Runnable() {
@Override
public void run() {
synchronized (mutex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public ClusterTasksResult<JoinTask> execute(ClusterState currentState, List<Join

transportService.registerRequestHandler(
JOIN_ACTION_NAME,
ThreadPool.Names.GENERIC,
Names.CLUSTER_COORDINATION,
false,
false,
JoinRequest::new,
Expand All @@ -141,7 +141,7 @@ public ClusterTasksResult<JoinTask> execute(ClusterState currentState, List<Join

transportService.registerRequestHandler(
START_JOIN_ACTION_NAME,
Names.GENERIC,
Names.CLUSTER_COORDINATION,
false,
false,
StartJoinRequest::new,
Expand All @@ -164,7 +164,7 @@ public ClusterTasksResult<JoinTask> execute(ClusterState currentState, List<Join
final List<String> dataPaths = Environment.PATH_DATA_SETTING.get(settings);
transportService.registerRequestHandler(
JOIN_VALIDATE_ACTION_NAME,
ThreadPool.Names.GENERIC,
ThreadPool.Names.CLUSTER_COORDINATION,
ValidateJoinRequest::new,
(request, channel, task) -> {
final ClusterState localState = currentStateSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void startLagDetector(final long version) {
} else {
logger.debug("starting lag detector for version {}: {}", version, laggingTrackers);

threadPool.scheduleUnlessShuttingDown(clusterStateApplicationTimeout, Names.GENERIC, new Runnable() {
threadPool.scheduleUnlessShuttingDown(clusterStateApplicationTimeout, Names.CLUSTER_COORDINATION, new Runnable() {
@Override
public void run() {
laggingTrackers.forEach(t -> t.checkForLag(version));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public void handleException(TransportException exp) {

void leaderFailed(MessageSupplier messageSupplier, Exception e) {
if (isClosed.compareAndSet(false, true)) {
transportService.getThreadPool().generic().execute(new Runnable() {
transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION).execute(new Runnable() {
@Override
public void run() {
leaderFailureListener.onLeaderFailure(messageSupplier, e);
Expand Down Expand Up @@ -423,7 +423,7 @@ interface LeaderFailureListener {
/**
* Called when a leader failure is detected. Checking the leader health is somewhat asynchronous, so this method may report a leader
* failure after the node has already decided there's no known leader for some other reason. This method is called on the {@code
* GENERIC} thread pool.
* COORDINATION} thread pool.
*
* @param messageSupplier The message to log if prior to this failure there was a known master in the cluster.
* @param exception An exception that gives more detail of the leader failure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class PreVoteCollector {

transportService.registerRequestHandler(
REQUEST_PRE_VOTE_ACTION_NAME,
Names.GENERIC,
Names.CLUSTER_COORDINATION,
false,
false,
PreVoteRequest::new,
Expand Down Expand Up @@ -174,7 +174,7 @@ public void handleException(TransportException exp) {

@Override
public String executor() {
return Names.GENERIC;
return Names.CLUSTER_COORDINATION;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public PublicationTransportHandler(

transportService.registerRequestHandler(
PUBLISH_STATE_ACTION_NAME,
ThreadPool.Names.GENERIC,
ThreadPool.Names.CLUSTER_COORDINATION,
false,
false,
BytesTransportRequest::new,
Expand All @@ -105,7 +105,7 @@ public PublicationTransportHandler(

transportService.registerRequestHandler(
COMMIT_STATE_ACTION_NAME,
ThreadPool.Names.GENERIC,
ThreadPool.Names.CLUSTER_COORDINATION,
false,
false,
ApplyCommitRequest::new,
Expand Down Expand Up @@ -383,7 +383,7 @@ public void sendApplyCommit(
COMMIT_STATE_ACTION_NAME,
applyCommitRequest,
STATE_REQUEST_OPTIONS,
new ActionListenerResponseHandler<>(listener, in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)
new ActionListenerResponseHandler<>(listener, in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.CLUSTER_COORDINATION)
);
}

Expand Down Expand Up @@ -456,10 +456,10 @@ private void sendClusterState(
PUBLISH_STATE_ACTION_NAME,
new BytesTransportRequest(bytes, destination.getVersion()),
STATE_REQUEST_OPTIONS,
new ActionListenerResponseHandler<PublishWithJoinResponse>(
new ActionListenerResponseHandler<>(
ActionListener.runAfter(listener, bytes::decRef),
PublishWithJoinResponse::new,
ThreadPool.Names.GENERIC
ThreadPool.Names.CLUSTER_COORDINATION
)
);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
public static class Names {
public static final String SAME = "same";
public static final String GENERIC = "generic";
public static final String CLUSTER_COORDINATION = "cluster_coordination";
public static final String GET = "get";
public static final String ANALYZE = "analyze";
public static final String WRITE = "write";
Expand Down Expand Up @@ -219,6 +220,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5), false)
);
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false));
builders.put(Names.CLUSTER_COORDINATION, new FixedExecutorBuilder(settings, Names.CLUSTER_COORDINATION, 1, -1, false));
builders.put(
Names.FETCH_SHARD_STORE,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5), false)
Expand Down