Skip to content

Deserialize publish requests on generic thread-pool #108814

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
6 changes: 6 additions & 0 deletions docs/changelog/108814.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 108814
summary: Deserialize publish requests on generic thread-pool
area: Cluster Coordination
type: bug
issues:
- 106352
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ private void onClusterStateApplied() {
}

PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
assert ThreadPool.assertCurrentThreadPool(Names.CLUSTER_COORDINATION);
assert publishRequest.getAcceptedState().nodes().getLocalNode().equals(getLocalNode())
: publishRequest.getAcceptedState().nodes().getLocalNode() + " != " + getLocalNode();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStatePublicationEvent;
import org.elasticsearch.cluster.Diff;
Expand Down Expand Up @@ -55,6 +56,7 @@
import java.util.function.Function;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.threadpool.ThreadPool.Names.GENERIC;

/**
* Implements the low-level mechanics of sending a cluster state to other nodes in the cluster during a publication.
Expand Down Expand Up @@ -105,11 +107,11 @@ public PublicationTransportHandler(

transportService.registerRequestHandler(
PUBLISH_STATE_ACTION_NAME,
this.clusterCoordinationExecutor,
transportService.getThreadPool().generic(),
false,
false,
BytesTransportRequest::new,
(request, channel, task) -> channel.sendResponse(handleIncomingPublishRequest(request))
(request, channel, task) -> this.handleIncomingPublishRequest(request, new ChannelActionListener<>(channel))
);
}

Expand All @@ -122,7 +124,11 @@ public PublishClusterStateStats stats() {
);
}

private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
private void handleIncomingPublishRequest(
BytesTransportRequest request,
ActionListener<PublishWithJoinResponse> publishResponseListener
) throws IOException {
assert ThreadPool.assertCurrentThreadPool(GENERIC);
final Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in = request.bytes().streamInput();
try {
Expand All @@ -145,72 +151,88 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}
fullClusterStateReceivedCount.incrementAndGet();
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
final PublishWithJoinResponse response = acceptState(incomingState);
lastSeenClusterState.set(incomingState);
return response;
acceptState(incomingState, publishResponseListener.map(response -> {
lastSeenClusterState.set(incomingState);
return response;
}));
} else {
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug("received diff for but don't have any local cluster state - requesting full state");
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
throw new IncompatibleClusterStateVersionException("have no local cluster state");
} else {
ClusterState incomingState;
try {
final Diff<ClusterState> diff;
final boolean includesLastCommittedData = request.version().onOrAfter(INCLUDES_LAST_COMMITTED_DATA_VERSION);
final boolean clusterUuidCommitted;
final CoordinationMetadata.VotingConfiguration lastCommittedConfiguration;

// Close stream early to release resources used by the de-compression as early as possible
try (StreamInput input = in) {
diff = ClusterState.readDiffFrom(input, lastSeen.nodes().getLocalNode());
if (includesLastCommittedData) {
clusterUuidCommitted = in.readBoolean();
lastCommittedConfiguration = new CoordinationMetadata.VotingConfiguration(in);
} else {
clusterUuidCommitted = false;
lastCommittedConfiguration = null;
}
assert input.read() == -1;
}
incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
if (includesLastCommittedData) {
final var adjustedMetadata = incomingState.metadata()
.withLastCommittedValues(clusterUuidCommitted, lastCommittedConfiguration);
if (adjustedMetadata != incomingState.metadata()) {
incomingState = ClusterState.builder(incomingState).metadata(adjustedMetadata).build();
}
}
} catch (IncompatibleClusterStateVersionException e) {
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
throw e;
} catch (Exception e) {
logger.warn("unexpected error while deserializing an incoming cluster state", e);
assert false : e;
throw e;
}
final ClusterState incomingState = deserializeAndApplyDiff(request, in, lastSeen);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
logger.debug(
"received diff cluster state version [{}] with uuid [{}], diff size [{}]",
incomingState.version(),
incomingState.stateUUID(),
request.bytes().length()
);
final PublishWithJoinResponse response = acceptState(incomingState);
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
return response;
acceptState(incomingState, publishResponseListener.map(response -> {
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
return response;
}));
}
}
} finally {
IOUtils.close(in);
}
}

private PublishWithJoinResponse acceptState(ClusterState incomingState) {
private ClusterState deserializeAndApplyDiff(BytesTransportRequest request, StreamInput in, ClusterState currentState)
throws IOException {
ClusterState incomingState;
try {
final Diff<ClusterState> diff;
final boolean includesLastCommittedData = request.version().onOrAfter(INCLUDES_LAST_COMMITTED_DATA_VERSION);
final boolean clusterUuidCommitted;
final CoordinationMetadata.VotingConfiguration lastCommittedConfiguration;

// Close stream early to release resources used by the de-compression as early as possible
try (StreamInput input = in) {
diff = ClusterState.readDiffFrom(input, currentState.nodes().getLocalNode());
if (includesLastCommittedData) {
clusterUuidCommitted = in.readBoolean();
lastCommittedConfiguration = new CoordinationMetadata.VotingConfiguration(in);
} else {
clusterUuidCommitted = false;
lastCommittedConfiguration = null;
}
assert input.read() == -1;
}
incomingState = diff.apply(currentState); // might throw IncompatibleClusterStateVersionException
if (includesLastCommittedData) {
final var adjustedMetadata = incomingState.metadata()
.withLastCommittedValues(clusterUuidCommitted, lastCommittedConfiguration);
if (adjustedMetadata != incomingState.metadata()) {
incomingState = ClusterState.builder(incomingState).metadata(adjustedMetadata).build();
}
}
} catch (IncompatibleClusterStateVersionException e) {
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
throw e;
} catch (Exception e) {
logger.warn("unexpected error while deserializing an incoming cluster state", e);
assert false : e;
throw e;
}
return incomingState;
}

/**
* Delegate to cluster-coordination thread to apply received state
*
* @param incomingState The received cluster state
* @param actionListener The action to perform once the publish call completes
*/
private void acceptState(ClusterState incomingState, ActionListener<PublishWithJoinResponse> actionListener) {
assert incomingState.nodes().isLocalNodeElectedMaster() == false
: "should handle local publications locally, but got " + incomingState;
return handlePublishRequest.apply(new PublishRequest(incomingState));
clusterCoordinationExecutor.execute(
ActionRunnable.supply(actionListener, () -> handlePublishRequest.apply(new PublishRequest(incomingState)))
);
}

public PublicationContext newPublicationContext(ClusterStatePublicationEvent clusterStatePublicationEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ public void onFailure(Exception e) {
new PublishRequest(clusterState0),
ActionListener.running(() -> assertTrue(completed.compareAndSet(false, true)))
);
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(completed.getAndSet(false));
receivedState0 = receivedStateRef.getAndSet(null);
assertEquals(clusterState0.stateUUID(), receivedState0.stateUUID());
Expand Down Expand Up @@ -499,6 +500,7 @@ public void onFailure(Exception e) {
new PublishRequest(clusterState1),
ActionListener.running(() -> assertTrue(completed.compareAndSet(false, true)))
);
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(completed.getAndSet(false));
var receivedState1 = receivedStateRef.getAndSet(null);
assertEquals(clusterState1.stateUUID(), receivedState1.stateUUID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,14 @@ protected static int defaultInt(Setting<Integer> setting) {
// 1. submit the task to the master service
// 2. state publisher task on master
// 3. master sends out PublishRequests to nodes
// 4. master receives PublishResponses from nodes
// 5. master sends ApplyCommitRequests to nodes
// 6. nodes apply committed cluster state
// 7. master receives ApplyCommitResponses
// 8. apply committed state on master (last one to apply cluster state)
// 9. complete the publication listener back on the master service thread
public static final int CLUSTER_STATE_UPDATE_NUMBER_OF_DELAYS = 9;
// 4. nodes deserialize received cluster state
// 5. master receives PublishResponses from nodes
// 6. master sends ApplyCommitRequests to nodes
// 7. nodes apply committed cluster state
// 8. master receives ApplyCommitResponses
// 9. apply committed state on master (last one to apply cluster state)
// 10. complete the publication listener back on the master service thread
public static final int CLUSTER_STATE_UPDATE_NUMBER_OF_DELAYS = 10;
public static final long DEFAULT_CLUSTER_STATE_UPDATE_DELAY = CLUSTER_STATE_UPDATE_NUMBER_OF_DELAYS * DEFAULT_DELAY_VARIABILITY;

private static final int ELECTION_RETRIES = 10;
Expand Down