Skip to content

Use batched executor for shutdown node actions #86018

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,97 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.shutdown.DeleteShutdownNodeAction.Request;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.NodesShutdownMetadata.getShutdownsOrEmpty;

public class TransportDeleteShutdownNodeAction extends AcknowledgedTransportMasterNodeAction<DeleteShutdownNodeAction.Request> {
public class TransportDeleteShutdownNodeAction extends AcknowledgedTransportMasterNodeAction<Request> {
private static final Logger logger = LogManager.getLogger(TransportDeleteShutdownNodeAction.class);

private final DeleteShutdownNodeExecutor executor = new DeleteShutdownNodeExecutor();

private static boolean deleteShutdownNodeState(Map<String, SingleNodeShutdownMetadata> shutdownMetadata, Request request) {
if (shutdownMetadata.containsKey(request.getNodeId()) == false) {
// noop, the node has already been removed by the time we got to this update task
return false;
}

logger.info("removing shutdown record for node [{}]", request.getNodeId());
shutdownMetadata.remove(request.getNodeId());
return true;
}

private static void ackAndReroute(Request request, ActionListener<AcknowledgedResponse> listener, RerouteService rerouteService) {
rerouteService.reroute("node registered for removal from cluster", Priority.URGENT, new ActionListener<>() {
@Override
public void onResponse(ClusterState clusterState) {}

@Override
public void onFailure(Exception e) {
logger.warn(() -> "failed to reroute after deleting node [" + request.getNodeId() + "] shutdown", e);
}
});
listener.onResponse(AcknowledgedResponse.TRUE);
}

// package private for tests
record DeleteShutdownNodeTask(Request request, ActionListener<AcknowledgedResponse> listener) implements ClusterStateTaskListener {
@Override
public void onFailure(Exception e) {
logger.error(new ParameterizedMessage("failed to delete shutdown for node [{}]", request.getNodeId()), e);
listener.onFailure(e);
}
}

// package private for tests
class DeleteShutdownNodeExecutor implements ClusterStateTaskExecutor<DeleteShutdownNodeTask> {
@Override
public ClusterState execute(ClusterState currentState, List<TaskContext<DeleteShutdownNodeTask>> taskContexts) throws Exception {
var shutdownMetadata = new HashMap<>(getShutdownsOrEmpty(currentState).getAllNodeMetadataMap());
boolean changed = false;
for (final var taskContext : taskContexts) {
var request = taskContext.getTask().request();
try {
changed |= deleteShutdownNodeState(shutdownMetadata, request);
} catch (Exception e) {
taskContext.onFailure(e);
continue;
}
var reroute = clusterService.getRerouteService();
taskContext.success(taskContext.getTask().listener().delegateFailure((l, s) -> ackAndReroute(request, l, reroute)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Observation: this is definitely emerging boilerplate:

taskContext.success(taskContext.getTask().listener().delegateFailure((l, s) -> ...

At the moment not every task has an underlying listener() so we can't yet do this in a generic way, but many tasks do and the remainder should be fixable. I hope to be able to address that to the point where this noise goes away.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole thing feels like boilerplate to me. 😄 I think with a nice base task class everything could be extracted out so this pattern is solidified and a common executor implementation can be used (at least in most cases).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted ;)

}
if (changed == false) {
return currentState;
}
return ClusterState.builder(currentState)
.metadata(
Metadata.builder(currentState.metadata())
.putCustom(NodesShutdownMetadata.TYPE, new NodesShutdownMetadata(shutdownMetadata))
)
.build();
}
}

@Inject
public TransportDeleteShutdownNodeAction(
TransportService transportService,
Expand All @@ -50,77 +121,29 @@ public TransportDeleteShutdownNodeAction(
clusterService,
threadPool,
actionFilters,
DeleteShutdownNodeAction.Request::new,
Request::new,
indexNameExpressionResolver,
ThreadPool.Names.SAME
);
}

@Override
protected void masterOperation(
Task task,
DeleteShutdownNodeAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener)
throws Exception {
{ // This block solely to ensure this NodesShutdownMetadata isn't accidentally used in the cluster state update task below
NodesShutdownMetadata nodesShutdownMetadata = state.metadata().custom(NodesShutdownMetadata.TYPE);
if (nodesShutdownMetadata == null || nodesShutdownMetadata.getAllNodeMetadataMap().get(request.getNodeId()) == null) {
throw new ResourceNotFoundException("node [" + request.getNodeId() + "] is not currently shutting down");
}
}

clusterService.submitStateUpdateTask("delete-node-shutdown-" + request.getNodeId(), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
NodesShutdownMetadata currentShutdownMetadata = getShutdownsOrEmpty(currentState);
var existing = currentShutdownMetadata.getAllNodeMetadataMap().get(request.getNodeId());
if (existing == null) {
// noop, the node has already been removed by the time we got to this update task
return currentState;
}

logger.info("removing shutdown record for node [{}]", request.getNodeId());

return ClusterState.builder(currentState)
.metadata(
Metadata.builder(currentState.metadata())
.putCustom(NodesShutdownMetadata.TYPE, currentShutdownMetadata.removeSingleNodeMetadata(request.getNodeId()))
)
.build();

}

@Override
public void onFailure(Exception e) {
logger.error(new ParameterizedMessage("failed to delete shutdown for node [{}]", request.getNodeId()), e);
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
clusterService.getRerouteService()
.reroute("node registered for removal from cluster", Priority.URGENT, new ActionListener<>() {
@Override
public void onResponse(ClusterState clusterState) {}

@Override
public void onFailure(Exception e) {
logger.warn(() -> "failed to reroute after deleting node [" + request.getNodeId() + "] shutdown", e);
}
});
listener.onResponse(AcknowledgedResponse.TRUE);
}
}, newExecutor());
var deleteTask = new DeleteShutdownNodeTask(request, listener);
var taskConfig = ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout());
clusterService.submitStateUpdateTask("delete-node-shutdown-" + request.getNodeId(), deleteTask, taskConfig, executor);
}

@Override
protected ClusterBlockException checkBlock(DeleteShutdownNodeAction.Request request, ClusterState state) {
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private static <T extends ClusterStateUpdateTask> ClusterStateTaskExecutor<T> newExecutor() {
return ClusterStateTaskExecutor.unbatched();
}
}
Loading