Skip to content

Commit ba7c6e6

Browse files
authored
Revert "Remove DiscoveryNodes#getAllNodes (#83538) (#83574)"
This reverts commit e6603b2.
1 parent b20f8f5 commit ba7c6e6

File tree

14 files changed

+104
-48
lines changed

14 files changed

+104
-48
lines changed

server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.IOException;
2525
import java.util.AbstractCollection;
2626
import java.util.ArrayList;
27+
import java.util.Collection;
2728
import java.util.Collections;
2829
import java.util.HashMap;
2930
import java.util.HashSet;
@@ -165,6 +166,14 @@ public ImmutableOpenMap<String, DiscoveryNode> getCoordinatingOnlyNodes() {
165166
return nodes.build();
166167
}
167168

169+
/**
170+
* Return all the nodes as a collection
171+
* @return
172+
*/
173+
public Collection<DiscoveryNode> getAllNodes() {
174+
return this;
175+
}
176+
168177
/**
169178
* Returns a stream of all nodes, with master nodes at the front
170179
*/

server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(
336336
// want to assign a persistent task to a node that will shortly be
337337
// leaving the cluster
338338
final List<DiscoveryNode> candidateNodes = currentState.nodes()
339+
.getAllNodes()
339340
.stream()
340341
.filter(dn -> isNodeShuttingDown(currentState, dn.getId()) == false)
341342
.collect(Collectors.toList());

server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,7 +1289,7 @@ public void testLocalShardIteratorFromPointInTime() {
12891289
} else {
12901290
// relocated or no longer assigned
12911291
relocatedContexts.add(new ShardId(indexMetadata.getIndex(), shardId));
1292-
targetNode = randomFrom(clusterState.nodes()).getId();
1292+
targetNode = randomFrom(clusterState.nodes().getAllNodes()).getId();
12931293
}
12941294
contexts.put(
12951295
new ShardId(indexMetadata.getIndex(), shardId),
@@ -1343,7 +1343,7 @@ public void testLocalShardIteratorFromPointInTime() {
13431343
anotherShardId,
13441344
new SearchContextIdForNode(
13451345
null,
1346-
randomFrom(clusterState.nodes()).getId(),
1346+
randomFrom(clusterState.nodes().getAllNodes()).getId(),
13471347
new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong(), null)
13481348
)
13491349
);

server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,7 @@ public void testTasksNotAssignedToShuttingDownNodes() {
627627
// Now that we have a bunch of tasks that need to be assigned, let's
628628
// mark half the nodes as shut down and make sure they do not have any
629629
// tasks assigned
630-
Collection<DiscoveryNode> allNodes = clusterState.nodes();
630+
Collection<DiscoveryNode> allNodes = clusterState.nodes().getAllNodes();
631631
Map<String, SingleNodeShutdownMetadata> shutdownMetadataMap = new HashMap<>();
632632
allNodes.stream()
633633
.limit(Math.floorDiv(allNodes.size(), 2))
@@ -816,6 +816,10 @@ private Assignment randomNodeAssignment(Collection<DiscoveryNode> nodes) {
816816
return Optional.ofNullable(randomFrom(nodes)).map(node -> new Assignment(node.getId(), "test assignment")).orElse(NO_NODE_FOUND);
817817
}
818818

819+
private Assignment randomNodeAssignment(DiscoveryNodes nodes) {
820+
return randomNodeAssignment(nodes.getAllNodes());
821+
}
822+
819823
private String dumpEvent(ClusterChangedEvent event) {
820824
return "nodes_changed: "
821825
+ event.nodesChanged()

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutorAssignmentTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ private void runAssignmentTest(
9292
clusterStateBuilder.nodes(nodesBuilder);
9393
final Assignment assignment = executor.getAssignment(
9494
mock(ShardFollowTask.class),
95-
clusterStateBuilder.nodes(),
95+
clusterStateBuilder.nodes().getAllNodes(),
9696
clusterStateBuilder.build()
9797
);
9898
consumer.accept(theSpecial, assignment);

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ DiscoveryNode selectNodeForPolicyExecution(DiscoveryNodes discoNodes) {
160160
return discoNodes.getLocalNode();
161161
}
162162

163-
final var nodes = discoNodes.stream()
163+
final var nodes = discoNodes.getAllNodes()
164+
.stream()
164165
// filter out elected master node (which is the local node)
165166
.filter(discoNode -> discoNode.getId().equals(discoNodes.getMasterNodeId()) == false)
166167
// filter out dedicated master nodes

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDeploymentStatsAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ protected void doExecute(
143143
ClusterState latestState = clusterService.state();
144144
Set<String> nodesShuttingDown = TransportStartTrainedModelDeploymentAction.nodesShuttingDown(latestState);
145145
List<DiscoveryNode> nodes = latestState.getNodes()
146+
.getAllNodes()
146147
.stream()
147148
.filter(d -> nodesShuttingDown.contains(d.getId()) == false)
148149
.filter(StartTrainedModelDeploymentAction.TaskParams::mayAllocateToNode)

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartTrainedModelDeploymentAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ public boolean test(ClusterState clusterState) {
441441
}
442442
Set<String> nodesShuttingDown = nodesShuttingDown(clusterState);
443443
List<DiscoveryNode> nodes = clusterState.nodes()
444+
.getAllNodes()
444445
.stream()
445446
.filter(d -> nodesShuttingDown.contains(d.getId()) == false)
446447
.filter(TaskParams::mayAllocateToNode)

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/allocation/TrainedModelAllocationClusterService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ ClusterState createModelAllocation(ClusterState currentState, StartTrainedModelD
253253

254254
Set<String> shuttingDownNodes = nodesShuttingDown(currentState);
255255
Map<String, String> nodeToReason = new TreeMap<>();
256-
for (DiscoveryNode node : currentState.getNodes()) {
256+
for (DiscoveryNode node : currentState.getNodes().getAllNodes()) {
257257
if (StartTrainedModelDeploymentAction.TaskParams.mayAllocateToNode(node) && shuttingDownNodes.contains(node.getId()) == false) {
258258
Optional<String> maybeError = nodeHasCapacity(currentState, params, node);
259259
if (maybeError.isPresent()) {
@@ -357,6 +357,7 @@ ClusterState addRemoveAllocationNodes(ClusterState currentState) {
357357
final TrainedModelAllocationMetadata.Builder builder = TrainedModelAllocationMetadata.builder(currentState);
358358
Set<String> shuttingDownNodes = nodesShuttingDown(currentState);
359359
Map<String, DiscoveryNode> currentEligibleNodes = currentState.getNodes()
360+
.getAllNodes()
360361
.stream()
361362
// TODO: Change when we update `mayAllocateToNode`
362363
.filter(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public void validate(OpenJobAction.JobParams params, ClusterState clusterState)
219219
validateJobAndId(jobId, job);
220220
// If we already know that we can't find an ml node because all ml nodes are running at capacity or
221221
// simply because there are no ml nodes in the cluster then we fail quickly here:
222-
PersistentTasksCustomMetadata.Assignment assignment = getAssignment(params, clusterState.nodes(), clusterState);
222+
PersistentTasksCustomMetadata.Assignment assignment = getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
223223
if (assignment.equals(AWAITING_UPGRADE)) {
224224
throw makeCurrentlyBeingUpgradedException(logger, params.getJobId());
225225
}

0 commit comments

Comments
 (0)