Skip to content

Complete listener on rejection in FollowersChecker #86259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -27,7 +30,6 @@
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
Expand Down Expand Up @@ -123,7 +125,10 @@ public FollowersChecker(
false,
false,
FollowerCheckRequest::new,
(request, transportChannel, task) -> handleFollowerCheck(request, transportChannel)
(request, transportChannel, task) -> handleFollowerCheck(
request,
new ChannelActionListener<>(transportChannel, FOLLOWER_CHECK_ACTION_NAME, request)
)
);
transportService.addConnectionListener(new TransportConnectionListener() {
@Override
Expand Down Expand Up @@ -172,7 +177,7 @@ public void updateFastResponseState(final long term, final Mode mode) {
fastResponseState = new FastResponseState(term, mode);
}

private void handleFollowerCheck(FollowerCheckRequest request, TransportChannel transportChannel) throws IOException {
private void handleFollowerCheck(FollowerCheckRequest request, ActionListener<Empty> listener) {
final StatusInfo statusInfo = nodeHealthService.getHealth();
if (statusInfo.getStatus() == UNHEALTHY) {
final String message = "handleFollowerCheck: node is unhealthy ["
Expand All @@ -186,37 +191,19 @@ private void handleFollowerCheck(FollowerCheckRequest request, TransportChannel
final FastResponseState responder = this.fastResponseState;
if (responder.mode == Mode.FOLLOWER && responder.term == request.term) {
logger.trace("responding to {} on fast path", request);
transportChannel.sendResponse(Empty.INSTANCE);
listener.onResponse(Empty.INSTANCE);
return;
}

if (request.term < responder.term) {
throw new CoordinationStateRejectedException("rejecting " + request + " since local state is " + this);
}

transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION).execute(new AbstractRunnable() {
@Override
protected void doRun() throws IOException {
logger.trace("responding to {} on slow path", request);
try {
handleRequestAndUpdateState.accept(request);
} catch (Exception e) {
transportChannel.sendResponse(e);
return;
}
transportChannel.sendResponse(Empty.INSTANCE);
}

@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("exception while responding to {}", request), e);
}

@Override
public String toString() {
return "slow path response to " + request;
}
});
transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION).execute(ActionRunnable.supply(listener, () -> {
logger.trace("responding to {} on slow path", request);
handleRequestAndUpdateState.accept(request);
return Empty.INSTANCE;
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -575,16 +577,21 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
}
};

final TransportService transportService = mockTransport.createTransportService(
settings,
deterministicTaskQueue.getThreadPool(),
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundTransportAddress -> follower,
null,
emptySet()
);
transportService.start();
transportService.acceptIncomingRequests();
final AtomicBoolean rejectExecution = new AtomicBoolean();

final TransportService transportService = mockTransport.createTransportService(settings, deterministicTaskQueue.getThreadPool(r -> {
if (rejectExecution.get()) {
final var exception = new EsRejectedExecutionException("simulated rejection", true);
if (r instanceof AbstractRunnable ar) {
ar.onRejection(exception);
return () -> {};
} else {
throw exception;
}
} else {
return r;
}
}), TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> follower, null, emptySet());

final AtomicBoolean calledCoordinator = new AtomicBoolean();
final AtomicReference<RuntimeException> coordinatorException = new AtomicReference<>();
Expand All @@ -597,6 +604,9 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
}
}, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"));

transportService.start();
transportService.acceptIncomingRequests();

{
// Does not call into the coordinator in the normal case
final long term = randomNonNegativeLong();
Expand Down Expand Up @@ -699,6 +709,37 @@ public void handleException(TransportException exp) {
assertTrue(calledCoordinator.get());
assertThat(receivedException.get(), not(nullValue()));
assertThat(receivedException.get().getRootCause().getMessage(), equalTo(exceptionMessage));
calledCoordinator.set(false);
}

{
// If it calls into the coordinator but the threadpool is shut down then the rejection is passed back to the caller
rejectExecution.set(true);
final long term = randomNonNegativeLong();
followersChecker.updateFastResponseState(term, randomFrom(Mode.LEADER, Mode.CANDIDATE));

final AtomicReference<TransportException> receivedException = new AtomicReference<>();
transportService.sendRequest(
follower,
FOLLOWER_CHECK_ACTION_NAME,
new FollowerCheckRequest(term, leader),
new TransportResponseHandler.Empty() {
@Override
public void handleResponse(TransportResponse.Empty response) {
fail("unexpected success");
}

@Override
public void handleException(TransportException exp) {
assertThat(exp, not(nullValue()));
assertTrue(receivedException.compareAndSet(null, exp));
}
}
);
deterministicTaskQueue.runAllTasks();
assertFalse(calledCoordinator.get());
assertThat(receivedException.get(), not(nullValue()));
assertThat(receivedException.get().getRootCause().getMessage(), equalTo("simulated rejection"));
}
}

Expand Down