Skip to content

Update the signature of the submitStateUpdateTask across the codebase. #82942

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ protected void masterOperation(
}
}, finalListener::onFailure);
CreateIndexTask clusterTask = new CreateIndexTask(request, listener, indexNameRef);
clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", clusterTask, clusterTask, executor, clusterTask);
clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", clusterTask, clusterTask, executor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ protected void masterOperation(
String source = "rollover_index source [" + trialRolloverIndexName + "] to target [" + trialRolloverIndexName + "]";
RolloverTask rolloverTask = new RolloverTask(rolloverRequest, statsResponse, trialRolloverResponse, listener);
ClusterStateTaskConfig config = ClusterStateTaskConfig.build(Priority.NORMAL, rolloverRequest.masterNodeTimeout());
clusterService.submitStateUpdateTask(source, rolloverTask, config, rolloverTaskExecutor, rolloverTask);
clusterService.submitStateUpdateTask(source, rolloverTask, config, rolloverTaskExecutor);
} else {
// conditions not met
listener.onResponse(trialRolloverResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ private Builder<T> result(T task, TaskResult executionResult) {
public ClusterTasksResult<T> build(ClusterState resultingState) {
return new ClusterTasksResult<>(resultingState, executionResults);
}

ClusterTasksResult<T> build(ClusterTasksResult<T> result, ClusterState previousState) {
return new ClusterTasksResult<>(result.resultingState == null ? previousState : result.resultingState, executionResults);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public ClusterTasksResult<LocalMasterServiceTask> execute(ClusterState currentSt
LocalMasterServiceTask.this.execute(currentState);
return ClusterTasksResult.<LocalMasterServiceTask>builder().successes(tasks).build(currentState);
}
},
this
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,7 @@ public void messageReceived(FailedShardEntry request, TransportChannel channel,
TASK_SOURCE,
update,
ClusterStateTaskConfig.build(Priority.HIGH),
shardFailedClusterStateTaskExecutor,
update
shardFailedClusterStateTaskExecutor
);
}
}
Expand Down Expand Up @@ -607,8 +606,7 @@ public void messageReceived(StartedShardEntry request, TransportChannel channel,
"shard-started " + request,
update,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateTaskExecutor,
update
shardStartedClusterStateTaskExecutor
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ private void removeNode(DiscoveryNode discoveryNode, String reason) {
"node-left",
task,
ClusterStateTaskConfig.build(Priority.IMMEDIATE),
nodeRemovalExecutor,
task
nodeRemovalExecutor
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,12 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -403,7 +401,7 @@ public void handleJoinRequest(DiscoveryNode sender, ActionListener<Void> joinLis
joinListener
);
assert joinTaskExecutor != null;
masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor, task);
masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
}

@Override
Expand Down Expand Up @@ -456,18 +454,17 @@ public void close(Mode newMode) {
assert closed == false : "CandidateJoinAccumulator closed";
closed = true;
if (newMode == Mode.LEADER) {
final Map<JoinTaskExecutor.Task, ClusterStateTaskListener> pendingAsTasks = new LinkedHashMap<>();
final Consumer<JoinTaskExecutor.Task> pendingTaskAdder = task -> pendingAsTasks.put(task, task);
final List<JoinTaskExecutor.Task> pendingAsTasks = new ArrayList<>();
joinRequestAccumulator.forEach(
(node, listener) -> pendingTaskAdder.accept(
(node, listener) -> pendingAsTasks.add(
new JoinTaskExecutor.Task(node, joinReasonService.getJoinReason(node, Mode.CANDIDATE), listener)
)
);

final String stateUpdateSource = "elected-as-master ([" + pendingAsTasks.size() + "] nodes joined)";

pendingTaskAdder.accept(JoinTaskExecutor.newBecomeMasterTask());
pendingTaskAdder.accept(JoinTaskExecutor.newFinishElectionTask());
pendingAsTasks.add(JoinTaskExecutor.newBecomeMasterTask());
pendingAsTasks.add(JoinTaskExecutor.newFinishElectionTask());
joinTaskExecutor = joinTaskExecutorGenerator.get();
masterService.submitStateUpdateTasks(
stateUpdateSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,7 @@ public void putMapping(final PutMappingClusterStateUpdateRequest request, final
"put-mapping " + Strings.arrayToCommaDelimitedString(request.indices()),
task,
ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()),
putMappingExecutor,
task
putMappingExecutor
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.List;

public class ClusterService extends AbstractLifecycleComponent {
private final MasterService masterService;
Expand Down Expand Up @@ -223,8 +223,7 @@ public final String getNodeName() {
}

/**
* Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig,
* ClusterStateTaskExecutor, ClusterStateTaskListener)}.
* Submits a cluster state update task
* @param source the source of the cluster state update task
* @param updateTask the full context for the cluster state update
* @param executor the executor to use for the submitted task.
Expand All @@ -234,7 +233,7 @@ public <T extends ClusterStateTaskConfig & ClusterStateTaskListener> void submit
T updateTask,
ClusterStateTaskExecutor<T> executor
) {
submitStateUpdateTask(source, updateTask, updateTask, executor, updateTask);
submitStateUpdateTask(source, updateTask, updateTask, executor);
}

/**
Expand All @@ -246,24 +245,21 @@ public <T extends ClusterStateTaskConfig & ClusterStateTaskListener> void submit
* tasks will all be executed on the executor in a single batch
*
* @param source the source of the cluster state update task
* @param task the state needed for the cluster state update task
* @param task the state and the callback needed for the cluster state update task
* @param config the cluster state update task configuration
* @param executor the cluster state update task executor; tasks
* that share the same executor will be executed
* batches on this executor
* @param listener callback after the cluster state update task
* completes
* @param <T> the type of the cluster state update task state
*
*/
public <T> void submitStateUpdateTask(
public <T extends ClusterStateTaskListener> void submitStateUpdateTask(
String source,
T task,
ClusterStateTaskConfig config,
ClusterStateTaskExecutor<T> executor,
ClusterStateTaskListener listener
ClusterStateTaskExecutor<T> executor
) {
masterService.submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
masterService.submitStateUpdateTasks(source, List.of(task), config, executor);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collections;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -426,9 +426,8 @@ public Builder incrementVersion(ClusterState clusterState) {
}

/**
* Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig,
* ClusterStateTaskExecutor, ClusterStateTaskListener)}, submitted updates will not be batched.
* @param source the source of the cluster state update task
* Submits a cluster state update task
* @param source the source of the cluster state update task
* @param updateTask the full context for the cluster state update
* @param executor
*
Expand All @@ -438,7 +437,7 @@ public <T extends ClusterStateTaskConfig & ClusterStateTaskListener> void submit
T updateTask,
ClusterStateTaskExecutor<T> executor
) {
submitStateUpdateTask(source, updateTask, updateTask, executor, updateTask);
submitStateUpdateTask(source, updateTask, updateTask, executor);
}

/**
Expand All @@ -450,24 +449,21 @@ public <T extends ClusterStateTaskConfig & ClusterStateTaskListener> void submit
* tasks will all be executed on the executor in a single batch
*
* @param source the source of the cluster state update task
* @param task the state needed for the cluster state update task
* @param task the state and the callback needed for the cluster state update task
* @param config the cluster state update task configuration
* @param executor the cluster state update task executor; tasks
* that share the same executor will be executed
* batches on this executor
* @param listener callback after the cluster state update task
* completes
* @param <T> the type of the cluster state update task state
*
*/
public <T> void submitStateUpdateTask(
public <T extends ClusterStateTaskListener> void submitStateUpdateTask(
String source,
T task,
ClusterStateTaskConfig config,
ClusterStateTaskExecutor<T> executor,
ClusterStateTaskListener listener
ClusterStateTaskExecutor<T> executor
) {
submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
submitStateUpdateTasks(source, List.of(task), config, executor);
}

/**
Expand Down Expand Up @@ -896,17 +892,17 @@ void onNoLongerMaster() {
* potentially with more tasks of the same executor.
*
* @param source the source of the cluster state update task
* @param tasks a map of update tasks and their corresponding listeners
* @param tasks a collection of update tasks and their corresponding listeners
* @param config the cluster state update task configuration
* @param executor the cluster state update task executor; tasks
* that share the same executor will be executed
* batches on this executor
* @param <T> the type of the cluster state update task state
*
*/
public <T> void submitStateUpdateTasks(
public <T extends ClusterStateTaskListener> void submitStateUpdateTasks(
final String source,
final Map<T, ClusterStateTaskListener> tasks,
final Collection<T> tasks,
final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor
) {
Expand All @@ -918,10 +914,9 @@ public <T> void submitStateUpdateTasks(
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();

List<Batcher.UpdateTask> safeTasks = tasks.entrySet()
.stream()
.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor))
.collect(Collectors.toList());
List<Batcher.UpdateTask> safeTasks = tasks.stream()
.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e, safe(e, supplier), executor))
.toList();
taskBatcher.submitTasks(safeTasks, config.timeout());
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3418,8 +3418,7 @@ private void innerUpdateSnapshotState(
"update snapshot state",
update,
ClusterStateTaskConfig.build(Priority.NORMAL),
SHARD_STATE_EXECUTOR,
update
SHARD_STATE_EXECUTOR
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toMap;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -275,20 +273,22 @@ public void onFailure(Exception e) {}
"testClusterStateTaskListenerThrowingExceptionIsOkay",
update,
ClusterStateTaskConfig.build(Priority.NORMAL),
new ClusterStateTaskExecutor<Object>() {
new ClusterStateTaskExecutor<>() {
@Override
public ClusterTasksResult<Object> execute(ClusterState currentState, List<Object> tasks) {
public ClusterTasksResult<ClusterStateTaskListener> execute(
ClusterState currentState,
List<ClusterStateTaskListener> tasks
) {
ClusterState newClusterState = ClusterState.builder(currentState).build();
return ClusterTasksResult.builder().successes(tasks).build(newClusterState);
return ClusterTasksResult.<ClusterStateTaskListener>builder().successes(tasks).build(newClusterState);
}

@Override
public void clusterStatePublished(ClusterStatePublicationEvent clusterStatePublicationEvent) {
published.set(true);
latch.countDown();
}
},
update
}
);

latch.await();
Expand Down Expand Up @@ -602,18 +602,16 @@ public void clusterStatePublished(ClusterStatePublicationEvent clusterPublicatio
var executor = assignment.v1();
submittedTasks.addAndGet(tasks.size());
if (tasks.size() == 1) {
var update = tasks.iterator().next();
masterService.submitStateUpdateTask(
threadName,
update,
tasks.iterator().next(),
ClusterStateTaskConfig.build(randomFrom(Priority.values())),
executor,
update
executor
);
} else {
masterService.submitStateUpdateTasks(
threadName,
tasks.stream().collect(toMap(Function.<Task>identity(), Function.<ClusterStateTaskListener>identity())),
tasks,
ClusterStateTaskConfig.build(randomFrom(Priority.values())),
executor
);
Expand Down Expand Up @@ -685,8 +683,7 @@ public void onFailure(Exception e) {}
(currentState, tasks) -> {
ClusterState newClusterState = ClusterState.builder(currentState).build();
return ClusterTasksResult.<ClusterStateTaskListener>builder().successes(tasks).build(newClusterState);
},
update
}
);

latch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ private void submitUnlessAlreadyQueued(String source, IndexLifecycleClusterState
busyIndices.remove(dedupKey);
assert removed : "tried to unregister unknown task [" + task + "]";
}));
clusterService.submitStateUpdateTask(source, task, ILM_TASK_CONFIG, ILM_TASK_EXECUTOR, task);
clusterService.submitStateUpdateTask(source, task, ILM_TASK_CONFIG, ILM_TASK_EXECUTOR);
} else {
logger.trace("skipped redundant execution of [{}]", source);
}
Expand Down
Loading