Skip to content

Commit 3da7371

Browse files
authored
Revert "Make RoutingNodes behave like a collection (#83540) (#83573)" (#83892)
This reverts commit 73c15ef.
1 parent b20f8f5 commit 3da7371

File tree

8 files changed

+41
-64
lines changed

8 files changed

+41
-64
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,7 @@ public ClusterState measureAllocation() {
156156
while (clusterState.getRoutingNodes().hasUnassignedShards()) {
157157
clusterState = strategy.applyStartedShards(
158158
clusterState,
159-
clusterState.getRoutingNodes()
160-
.stream()
159+
StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false)
161160
.flatMap(shardRoutings -> StreamSupport.stream(shardRoutings.spliterator(), false))
162161
.filter(ShardRouting::initializing)
163162
.collect(Collectors.toList())

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java

Lines changed: 21 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Map;
3535
import java.util.concurrent.atomic.AtomicReference;
3636
import java.util.stream.Collectors;
37+
import java.util.stream.StreamSupport;
3738

3839
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING;
3940
import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
@@ -73,15 +74,10 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
7374
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));
7475
}
7576

76-
final List<String> nodeIds = client().admin()
77-
.cluster()
78-
.prepareState()
79-
.get()
80-
.getState()
81-
.getRoutingNodes()
82-
.stream()
83-
.map(RoutingNode::nodeId)
84-
.collect(Collectors.toList());
77+
final List<String> nodeIds = StreamSupport.stream(
78+
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
79+
false
80+
).map(RoutingNode::nodeId).collect(Collectors.toList());
8581

8682
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
8783
clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
@@ -153,15 +149,10 @@ public void testAutomaticReleaseOfIndexBlock() throws Exception {
153149
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));
154150
}
155151

156-
final List<String> nodeIds = client().admin()
157-
.cluster()
158-
.prepareState()
159-
.get()
160-
.getState()
161-
.getRoutingNodes()
162-
.stream()
163-
.map(RoutingNode::nodeId)
164-
.collect(Collectors.toList());
152+
final List<String> nodeIds = StreamSupport.stream(
153+
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
154+
false
155+
).map(RoutingNode::nodeId).collect(Collectors.toList());
165156

166157
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
167158
clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
@@ -279,15 +270,10 @@ public void testOnlyMovesEnoughShardsToDropBelowHighWatermark() throws Exception
279270
)
280271
);
281272

282-
final List<String> nodeIds = client().admin()
283-
.cluster()
284-
.prepareState()
285-
.get()
286-
.getState()
287-
.getRoutingNodes()
288-
.stream()
289-
.map(RoutingNode::nodeId)
290-
.collect(Collectors.toList());
273+
final List<String> nodeIds = StreamSupport.stream(
274+
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
275+
false
276+
).map(RoutingNode::nodeId).collect(Collectors.toList());
291277

292278
assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0)));
293279

@@ -343,15 +329,10 @@ public void testDoesNotExceedLowWatermarkWhenRebalancing() throws Exception {
343329

344330
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
345331

346-
final List<String> nodeIds = client().admin()
347-
.cluster()
348-
.prepareState()
349-
.get()
350-
.getState()
351-
.getRoutingNodes()
352-
.stream()
353-
.map(RoutingNode::nodeId)
354-
.collect(Collectors.toList());
332+
final List<String> nodeIds = StreamSupport.stream(
333+
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
334+
false
335+
).map(RoutingNode::nodeId).collect(Collectors.toList());
355336

356337
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
357338
assertThat(event.state().getRoutingNodes().node(nodeIds.get(2)).size(), lessThanOrEqualTo(1));
@@ -456,15 +437,10 @@ public void testMovesShardsOffSpecificDataPathAboveWatermark() throws Exception
456437
)
457438
);
458439

459-
final List<String> nodeIds = client().admin()
460-
.cluster()
461-
.prepareState()
462-
.get()
463-
.getState()
464-
.getRoutingNodes()
465-
.stream()
466-
.map(RoutingNode::nodeId)
467-
.collect(Collectors.toList());
440+
final List<String> nodeIds = StreamSupport.stream(
441+
client().admin().cluster().prepareState().get().getState().getRoutingNodes().spliterator(),
442+
false
443+
).map(RoutingNode::nodeId).collect(Collectors.toList());
468444

469445
assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0)));
470446

server/src/internalClusterTest/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.util.Set;
2727
import java.util.stream.Collectors;
28+
import java.util.stream.StreamSupport;
2829

2930
import static org.elasticsearch.client.internal.Requests.clusterHealthRequest;
3031
import static org.elasticsearch.client.internal.Requests.createIndexRequest;
@@ -238,7 +239,9 @@ private String getLocalNodeId(String name) {
238239
}
239240

240241
private void assertNodesPresent(RoutingNodes routingNodes, String... nodes) {
241-
final Set<String> keySet = routingNodes.stream().map(RoutingNode::nodeId).collect(Collectors.toSet());
242+
final Set<String> keySet = StreamSupport.stream(routingNodes.spliterator(), false)
243+
.map(RoutingNode::nodeId)
244+
.collect(Collectors.toSet());
242245
assertThat(keySet, containsInAnyOrder(nodes));
243246
}
244247
}

server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.index.Index;
2828
import org.elasticsearch.index.shard.ShardId;
2929

30-
import java.util.AbstractCollection;
3130
import java.util.ArrayDeque;
3231
import java.util.ArrayList;
3332
import java.util.Collections;
@@ -45,6 +44,7 @@
4544
import java.util.Set;
4645
import java.util.function.Predicate;
4746
import java.util.stream.Collectors;
47+
import java.util.stream.StreamSupport;
4848

4949
/**
5050
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
@@ -60,7 +60,7 @@
6060
* <li> {@link #failShard} fails/cancels an assigned shard.
6161
* </ul>
6262
*/
63-
public class RoutingNodes extends AbstractCollection<RoutingNode> {
63+
public class RoutingNodes implements Iterable<RoutingNode> {
6464

6565
private final Map<String, RoutingNode> nodesToShards;
6666

@@ -299,7 +299,10 @@ public Set<String> getAttributeValues(String attributeName) {
299299
: Thread.currentThread().getName() + " should be the master service thread";
300300
return attributeValuesByAttribute.computeIfAbsent(
301301
attributeName,
302-
ignored -> stream().map(r -> r.node().getAttributes().get(attributeName)).filter(Objects::nonNull).collect(Collectors.toSet())
302+
ignored -> StreamSupport.stream(this.spliterator(), false)
303+
.map(r -> r.node().getAttributes().get(attributeName))
304+
.filter(Objects::nonNull)
305+
.collect(Collectors.toSet())
303306
);
304307
}
305308

@@ -866,7 +869,6 @@ private ShardRouting movePrimaryToUnassignedAndDemoteToReplica(ShardRouting shar
866869
/**
867870
* Returns the number of routing nodes
868871
*/
869-
@Override
870872
public int size() {
871873
return nodesToShards.size();
872874
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.function.Supplier;
4848
import java.util.stream.Collectors;
4949
import java.util.stream.Stream;
50+
import java.util.stream.StreamSupport;
5051

5152
/**
5253
* Listens for a node to go over the high watermark and kicks off an empty
@@ -374,8 +375,7 @@ public void onNewInfo(ClusterInfo info) {
374375
}
375376

376377
// Generate a map of node name to ID so we can use it to look up node replacement targets
377-
final Map<String, String> nodeNameToId = state.getRoutingNodes()
378-
.stream()
378+
final Map<String, String> nodeNameToId = StreamSupport.stream(state.getRoutingNodes().spliterator(), false)
379379
.collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2));
380380

381381
// Calculate both the source node id and the target node id of a "replace" type shutdown

server/src/test/java/org/elasticsearch/cluster/routing/allocation/AwarenessAllocationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.HashMap;
3535
import java.util.Map;
3636
import java.util.function.UnaryOperator;
37+
import java.util.stream.StreamSupport;
3738

3839
import static java.util.Collections.emptyMap;
3940
import static java.util.Collections.singletonList;
@@ -1097,8 +1098,7 @@ private void testExplanation(
10971098
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
10981099
);
10991100

1100-
final RoutingNode emptyNode = clusterState.getRoutingNodes()
1101-
.stream()
1101+
final RoutingNode emptyNode = StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false)
11021102
.filter(RoutingNode::isEmpty)
11031103
.findFirst()
11041104
.orElseThrow(AssertionError::new);

server/src/test/java/org/elasticsearch/cluster/routing/allocation/SameShardRoutingTests.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import java.util.Collections;
3434
import java.util.List;
35+
import java.util.stream.StreamSupport;
3536

3637
import static java.util.Collections.emptyMap;
3738
import static java.util.Collections.singletonList;
@@ -204,14 +205,12 @@ public void testSameHostCheckWithExplain() {
204205
new ClusterSettings(sameHostSetting, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
205206
);
206207

207-
final RoutingNode emptyNode = clusterState.getRoutingNodes()
208-
.stream()
208+
final RoutingNode emptyNode = StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false)
209209
.filter(node -> node.getByShardId(unassignedShard.shardId()) == null)
210210
.findFirst()
211211
.orElseThrow(AssertionError::new);
212212

213-
final RoutingNode otherNode = clusterState.getRoutingNodes()
214-
.stream()
213+
final RoutingNode otherNode = StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false)
215214
.filter(node -> node != emptyNode)
216215
.findFirst()
217216
.orElseThrow(AssertionError::new);

x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,7 @@ boolean needsThisTier(ShardRouting shard, RoutingAllocation allocation) {
344344
return false;
345345
}
346346
IndexMetadata indexMetadata = indexMetadata(shard, allocation);
347-
Set<Decision.Type> decisionTypes = allocation.routingNodes()
348-
.stream()
347+
Set<Decision.Type> decisionTypes = StreamSupport.stream(allocation.routingNodes().spliterator(), false)
349348
.map(
350349
node -> dataTierAllocationDecider.shouldFilter(
351350
indexMetadata,
@@ -370,8 +369,7 @@ boolean needsThisTier(ShardRouting shard, RoutingAllocation allocation) {
370369
allocation.debugDecision(true);
371370
try {
372371
// check that it does not belong on any existing node, i.e., there must be only a tier like reason it cannot be allocated
373-
return allocation.routingNodes()
374-
.stream()
372+
return StreamSupport.stream(allocation.routingNodes().spliterator(), false)
375373
.anyMatch(node -> isFilterTierOnlyDecision(allocationDeciders.canAllocate(shard, node, allocation), indexMetadata));
376374
} finally {
377375
allocation.debugDecision(false);

0 commit comments

Comments
 (0)