Skip to content

Remove DiscoveryNodes#getAllNodes #83538

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.IOException;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -166,14 +165,6 @@ public ImmutableOpenMap<String, DiscoveryNode> getCoordinatingOnlyNodes() {
return nodes.build();
}

/**
* Return all the nodes as a collection
* @return
*/
public Collection<DiscoveryNode> getAllNodes() {
return this;
}

/**
* Returns a stream of all nodes, with master nodes at the front
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(
// want to assign a persistent task to a node that will shortly be
// leaving the cluster
final List<DiscoveryNode> candidateNodes = currentState.nodes()
.getAllNodes()
.stream()
.filter(dn -> isNodeShuttingDown(currentState, dn.getId()) == false)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,7 @@ public void testLocalShardIteratorFromPointInTime() {
} else {
// relocated or no longer assigned
relocatedContexts.add(new ShardId(indexMetadata.getIndex(), shardId));
targetNode = randomFrom(clusterState.nodes().getAllNodes()).getId();
targetNode = randomFrom(clusterState.nodes()).getId();
}
contexts.put(
new ShardId(indexMetadata.getIndex(), shardId),
Expand Down Expand Up @@ -1343,7 +1343,7 @@ public void testLocalShardIteratorFromPointInTime() {
anotherShardId,
new SearchContextIdForNode(
null,
randomFrom(clusterState.nodes().getAllNodes()).getId(),
randomFrom(clusterState.nodes()).getId(),
new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong(), null)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ public void testTasksNotAssignedToShuttingDownNodes() {
// Now that we have a bunch of tasks that need to be assigned, let's
// mark half the nodes as shut down and make sure they do not have any
// tasks assigned
Collection<DiscoveryNode> allNodes = clusterState.nodes().getAllNodes();
Collection<DiscoveryNode> allNodes = clusterState.nodes();
Map<String, SingleNodeShutdownMetadata> shutdownMetadataMap = new HashMap<>();
allNodes.stream()
.limit(Math.floorDiv(allNodes.size(), 2))
Expand Down Expand Up @@ -816,10 +816,6 @@ private Assignment randomNodeAssignment(Collection<DiscoveryNode> nodes) {
return Optional.ofNullable(randomFrom(nodes)).map(node -> new Assignment(node.getId(), "test assignment")).orElse(NO_NODE_FOUND);
}

private Assignment randomNodeAssignment(DiscoveryNodes nodes) {
return randomNodeAssignment(nodes.getAllNodes());
}

private String dumpEvent(ClusterChangedEvent event) {
return "nodes_changed: "
+ event.nodesChanged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private void runAssignmentTest(
clusterStateBuilder.nodes(nodesBuilder);
final Assignment assignment = executor.getAssignment(
mock(ShardFollowTask.class),
clusterStateBuilder.nodes().getAllNodes(),
clusterStateBuilder.nodes(),
clusterStateBuilder.build()
);
consumer.accept(theSpecial, assignment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ DiscoveryNode selectNodeForPolicyExecution(DiscoveryNodes discoNodes) {
return discoNodes.getLocalNode();
}

final var nodes = discoNodes.getAllNodes()
.stream()
final var nodes = discoNodes.stream()
// filter out elected master node (which is the local node)
.filter(discoNode -> discoNode.getId().equals(discoNodes.getMasterNodeId()) == false)
// filter out dedicated master nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ protected void doExecute(
ClusterState latestState = clusterService.state();
Set<String> nodesShuttingDown = TransportStartTrainedModelDeploymentAction.nodesShuttingDown(latestState);
List<DiscoveryNode> nodes = latestState.getNodes()
.getAllNodes()
.stream()
.filter(d -> nodesShuttingDown.contains(d.getId()) == false)
.filter(StartTrainedModelDeploymentAction.TaskParams::mayAllocateToNode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ public boolean test(ClusterState clusterState) {
}
Set<String> nodesShuttingDown = nodesShuttingDown(clusterState);
List<DiscoveryNode> nodes = clusterState.nodes()
.getAllNodes()
.stream()
.filter(d -> nodesShuttingDown.contains(d.getId()) == false)
.filter(TaskParams::mayAllocateToNode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ ClusterState createModelAllocation(ClusterState currentState, StartTrainedModelD

Set<String> shuttingDownNodes = nodesShuttingDown(currentState);
Map<String, String> nodeToReason = new TreeMap<>();
for (DiscoveryNode node : currentState.getNodes().getAllNodes()) {
for (DiscoveryNode node : currentState.getNodes()) {
if (StartTrainedModelDeploymentAction.TaskParams.mayAllocateToNode(node) && shuttingDownNodes.contains(node.getId()) == false) {
Optional<String> maybeError = nodeHasCapacity(currentState, params, node);
if (maybeError.isPresent()) {
Expand Down Expand Up @@ -357,7 +357,6 @@ ClusterState addRemoveAllocationNodes(ClusterState currentState) {
final TrainedModelAllocationMetadata.Builder builder = TrainedModelAllocationMetadata.builder(currentState);
Set<String> shuttingDownNodes = nodesShuttingDown(currentState);
Map<String, DiscoveryNode> currentEligibleNodes = currentState.getNodes()
.getAllNodes()
.stream()
// TODO: Change when we update `mayAllocateToNode`
.filter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public void validate(OpenJobAction.JobParams params, ClusterState clusterState)
validateJobAndId(jobId, job);
// If we already know that we can't find an ml node because all ml nodes are running at capacity or
// simply because there are no ml nodes in the cluster then we fail quickly here:
PersistentTasksCustomMetadata.Assignment assignment = getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
PersistentTasksCustomMetadata.Assignment assignment = getAssignment(params, clusterState.nodes(), clusterState);
if (assignment.equals(AWAITING_UPGRADE)) {
throw makeCurrentlyBeingUpgradedException(logger, params.getJobId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testGetAssignment_UpgradeModeIsEnabled() {
.metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(true).build()))
.build();

Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
Assignment assignment = executor.getAssignment(params, clusterState.nodes(), clusterState);
assertThat(assignment.getExecutorNode(), is(nullValue()));
assertThat(assignment.getExplanation(), is(equalTo("persistent task cannot be assigned while upgrade mode is enabled.")));
}
Expand All @@ -70,7 +70,7 @@ public void testGetAssignment_NoNodes() {
.metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build()))
.build();

Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
Assignment assignment = executor.getAssignment(params, clusterState.nodes(), clusterState);
assertThat(assignment.getExecutorNode(), is(nullValue()));
assertThat(assignment.getExplanation(), is(emptyString()));
}
Expand All @@ -89,7 +89,7 @@ public void testGetAssignment_NoMlNodes() {
)
.build();

Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
Assignment assignment = executor.getAssignment(params, clusterState.nodes(), clusterState);
assertThat(assignment.getExecutorNode(), is(nullValue()));
assertThat(
assignment.getExplanation(),
Expand Down Expand Up @@ -118,7 +118,7 @@ public void testGetAssignment_MlNodesAreTooOld() {
)
.build();

Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
Assignment assignment = executor.getAssignment(params, clusterState.nodes(), clusterState);
assertThat(assignment.getExecutorNode(), is(nullValue()));
assertThat(
assignment.getExplanation(),
Expand Down Expand Up @@ -153,7 +153,7 @@ public void testGetAssignment_MlNodeIsNewerThanTheMlJobButTheAssignmentSuceeds()
.nodes(DiscoveryNodes.builder().add(createNode(0, true, Version.V_7_10_0)))
.build();

Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
Assignment assignment = executor.getAssignment(params, clusterState.nodes(), clusterState);
assertThat(assignment.getExecutorNode(), is(equalTo("_node_id0")));
assertThat(assignment.getExplanation(), is(emptyString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityCountLim

JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -159,7 +159,7 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityCount

JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
dataFrameAnalyticsId,
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -206,7 +206,7 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi

JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -255,7 +255,7 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_givenTaskHasNull

JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
dataFrameAnalyticsId,
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -287,7 +287,7 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_firstJobTooBigMemor

JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -339,7 +339,7 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemor

JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
dataFrameAnalyticsId,
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -390,7 +390,7 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_firstJobTooBigMe

JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
dataFrameAnalyticsId,
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -460,7 +460,7 @@ public void testSelectLeastLoadedMlNode_noMlNodes() {

JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -526,7 +526,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
ClusterState cs = csBuilder.build();
JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs,
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job6.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand All @@ -547,7 +547,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
Job job7 = BaseMlIntegTestCase.createFareQuoteJob("job_id7", JOB_MEMORY_REQUIREMENT).build(new Date());
jobNodeSelector = new JobNodeSelector(
cs,
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job7.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand All @@ -570,7 +570,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
cs = csBuilder.build();
jobNodeSelector = new JobNodeSelector(
cs,
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job7.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand All @@ -590,7 +590,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
cs = csBuilder.build();
jobNodeSelector = new JobNodeSelector(
cs,
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job7.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -663,7 +663,7 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob()
// Allocation won't be possible if the stale failed job is treated as opening
JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs,
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job7.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand All @@ -683,7 +683,7 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob()
Job job8 = BaseMlIntegTestCase.createFareQuoteJob("job_id8", JOB_MEMORY_REQUIREMENT).build(new Date());
jobNodeSelector = new JobNodeSelector(
cs,
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job8.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -738,7 +738,7 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() {
cs.metadata(metadata);
JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -793,7 +793,7 @@ public void testSelectLeastLoadedMlNode_reasonsAreInDeterministicOrder() {
cs.metadata(metadata);
JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -858,7 +858,7 @@ public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion()
cs.metadata(metadata);
JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -908,7 +908,7 @@ public void testSelectLeastLoadedMlNode_jobWithRules() {
Job job = jobWithRules("job_with_rules");
JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -1001,7 +1001,7 @@ public void testConsiderLazyAssignmentWithNoLazyNodes() {
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());
JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -1045,7 +1045,7 @@ public void testConsiderLazyAssignmentWithLazyNodes() {
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());
JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -1074,7 +1074,7 @@ public void testMaximumPossibleNodeMemoryTooSmall() {

JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand Down Expand Up @@ -1164,7 +1164,7 @@ public void testPerceivedCapacityAndMaxFreeMemory() {

JobNodeSelector jobNodeSelector = new JobNodeSelector(
cs.build(),
shuffled(cs.nodes().getAllNodes()),
shuffled(cs.nodes()),
job.getId(),
MlTasks.JOB_TASK_NAME,
memoryTracker,
Expand Down
Loading