Skip to content

Add support of getting a Java stream on ImmutableOpenMap #76921

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a68f350
Add support of getting a Java stream on ImmutableOpenMap
arteam Aug 25, 2021
e55ffe0
Merge branch 'master' into add-stream-to-immutable-openmap
elasticmachine Aug 30, 2021
490a13c
Merge branch 'master' into add-stream-to-immutable-openmap
arteam Sep 2, 2021
10af100
Use fixed spliterator
arteam Sep 2, 2021
c581f9d
Make ImmutableOpenMap stream safe
arteam Sep 2, 2021
7023f6c
stream->streamBuilder
arteam Sep 3, 2021
195d207
Create spliterator on the fly by copying cursor to Map.Entry
arteam Sep 3, 2021
658f92f
Add DISTINCT characteristic
arteam Sep 3, 2021
b5b82bd
Use Map.Entry instead of internal Entry with public fields
arteam Sep 3, 2021
24587b5
Fix spotless warning
arteam Sep 3, 2021
c231712
Merge branch 'master' into add-stream-to-immutable-openmap
elasticmachine Sep 3, 2021
e3c6a48
Migrate to Map.Entry::getValue
arteam Sep 3, 2021
cc89ef3
Add a test for stream operations on a random map
arteam Sep 3, 2021
f2b200d
Fix checkstyle warnings
arteam Sep 3, 2021
729a923
Use predictable Random instead of ThreadLocalRandom
arteam Sep 3, 2021
74ec0b6
Merge branch 'master' into add-stream-to-immutable-openmap
elasticmachine Sep 3, 2021
d5217ff
Merge branch 'master' into add-stream-to-immutable-openmap
elasticmachine Sep 8, 2021
3e8276f
Update server/src/test/java/org/elasticsearch/common/collect/Immutabl…
arteam Sep 8, 2021
6fc0c5e
Use safe accumulator and randomize the limit
arteam Sep 8, 2021
73a6a76
Checkstyle
arteam Sep 8, 2021
1c6d1d2
Merge branch 'master' into add-stream-to-immutable-openmap
elasticmachine Sep 8, 2021
bcb5b5e
Merge branch 'master' into add-stream-to-immutable-openmap
elasticmachine Sep 8, 2021
8bdd44c
Update server/src/test/java/org/elasticsearch/common/collect/Immutabl…
arteam Sep 8, 2021
899f7b3
Create not only random numbers, but also a random amount of numbers
arteam Sep 8, 2021
879da8b
Merge branch 'master' into add-stream-to-immutable-openmap
elasticmachine Sep 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
Expand Down Expand Up @@ -159,8 +158,8 @@ public void testRelocation() throws Exception {
assertNoFailures(resp);
assertHitCount(resp, numDocs);
assertThat(resp.pointInTimeId(), equalTo(pitId));
final Set<String> dataNodes = StreamSupport.stream(clusterService().state().nodes().getDataNodes().spliterator(), false)
.map(e -> e.value.getId()).collect(Collectors.toSet());
final Set<String> dataNodes = clusterService().state().nodes().getDataNodes().stream()
.map(e -> e.getValue().getId()).collect(Collectors.toSet());
final List<String> excludedNodes = randomSubsetOf(2, dataNodes);
assertAcked(client().admin().indices().prepareUpdateSettings("test")
.setSettings(Settings.builder().put("index.routing.allocation.exclude._id", String.join(",", excludedNodes)).build()));
Expand Down Expand Up @@ -313,9 +312,8 @@ public void testCanMatch() throws Exception {

public void testPartialResults() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
final List<String> dataNodes =
StreamSupport.stream(internalCluster().clusterService().state().nodes().getDataNodes().spliterator(), false)
.map(e -> e.value.getName())
final List<String> dataNodes = internalCluster().clusterService().state().nodes().getDataNodes().stream()
.map(e -> e.getValue().getName())
.collect(Collectors.toList());
final String assignedNodeForIndex1 = randomFrom(dataNodes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.ReplicaShardAllocatorIT;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
Expand Down Expand Up @@ -124,7 +124,6 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -1283,8 +1282,8 @@ public void testOngoingRecoveryAndMasterFailOver() throws Exception {

public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
List<String> nodes = randomSubsetOf(2, StreamSupport.stream(clusterService().state().nodes().getDataNodes().spliterator(), false)
.map(node -> node.value.getName()).collect(Collectors.toSet()));
List<String> nodes = randomSubsetOf(2, clusterService().state().nodes().getDataNodes().stream()
.map(node -> node.getValue().getName()).collect(Collectors.toSet()));
String indexName = "test-index";
createIndex(indexName, Settings.builder()
.put("index.number_of_shards", 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* Get index action.
Expand Down Expand Up @@ -62,8 +62,9 @@ protected void doMasterOperation(Task task, final GetIndexRequest request, Strin
ImmutableOpenMap<String, Settings> settings = ImmutableOpenMap.of();
ImmutableOpenMap<String, Settings> defaultSettings = ImmutableOpenMap.of();
ImmutableOpenMap<String, String> dataStreams = ImmutableOpenMap.<String, String>builder()
.putAll(StreamSupport.stream(state.metadata().findDataStreams(concreteIndices).spliterator(), false)
.collect(Collectors.toMap(k -> k.key, v -> v.value.getName()))).build();
.putAll(state.metadata().findDataStreams(concreteIndices).stream()
.collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getName())))
.build();
Feature[] features = request.features();
boolean doneAliases = false;
boolean doneMappings = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.cluster;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -47,7 +48,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.StreamSupport;

/**
* Represents the current state of the cluster.
Expand Down Expand Up @@ -567,7 +567,7 @@ public Builder removeCustom(String type) {
}

public Builder customs(ImmutableOpenMap<String, Custom> customs) {
StreamSupport.stream(customs.spliterator(), false).forEach(cursor -> Objects.requireNonNull(cursor.value, cursor.key));
customs.stream().forEach(entry -> Objects.requireNonNull(entry.getValue(), entry.getKey()));
this.customs.putAll(customs);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
Expand Down Expand Up @@ -72,7 +73,6 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
import static org.elasticsearch.common.settings.Settings.writeSettingsToStream;
Expand Down Expand Up @@ -1313,7 +1313,7 @@ public Builder removeCustom(String type) {
}

public Builder customs(ImmutableOpenMap<String, Custom> customs) {
StreamSupport.stream(customs.spliterator(), false).forEach(cursor -> Objects.requireNonNull(cursor.value, cursor.key));
customs.stream().forEach(entry -> Objects.requireNonNull(entry.getValue(), entry.getKey()));
this.customs.putAll(customs);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.Nullable;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -167,7 +168,8 @@ public Collection<DiscoveryNode> getAllNodes() {
* Returns a stream of all nodes, with master nodes at the front
*/
public Stream<DiscoveryNode> mastersFirstStream() {
return Stream.concat(StreamSupport.stream(masterNodes.spliterator(), false).map(cur -> cur.value),
return Stream.concat(
masterNodes.stream().map(Map.Entry::getValue),
StreamSupport.stream(this.spliterator(), false).filter(n -> n.isMasterNode() == false));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.carrotsearch.hppc.ObjectLookupContainer;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand Down Expand Up @@ -38,13 +39,13 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* Listens for a node to go over the high watermark and kicks off an empty
Expand Down Expand Up @@ -294,9 +295,8 @@ public void onNewInfo(ClusterInfo info) {
logger.trace("no reroute required");
listener.onResponse(null);
}
final Set<String> indicesToAutoRelease = StreamSupport.stream(state.routingTable().indicesRouting()
.spliterator(), false)
.map(c -> c.key)
final Set<String> indicesToAutoRelease = state.routingTable().indicesRouting().stream()
.map(Map.Entry::getKey)
.filter(index -> indicesNotToAutoRelease.contains(index) == false)
.filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@
import com.carrotsearch.hppc.predicates.ObjectPredicate;
import com.carrotsearch.hppc.procedures.ObjectObjectProcedure;

import java.util.AbstractMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* An immutable map implementation based on open hash map.
Expand All @@ -36,7 +42,6 @@ public final class ImmutableOpenMap<KType, VType> implements Iterable<ObjectObje
private ImmutableOpenMap(ObjectObjectHashMap<KType, VType> map) {
this.map = map;
}

/**
* @return Returns the value associated with the given key or the default value
* for the key type, if the key is not associated with any value.
Expand Down Expand Up @@ -163,6 +168,26 @@ public void remove() {
};
}

/**
* Returns a sequential unordered stream of the map entries.
*
* @return a {@link Stream} of the map entries as {@link Map.Entry}
*/
public Stream<Map.Entry<KType, VType>> stream() {
final Iterator<ObjectObjectCursor<KType, VType>> mapIterator = map.iterator();
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(map.size(), Spliterator.SIZED | Spliterator.DISTINCT) {
@Override
public boolean tryAdvance(Consumer<? super Map.Entry<KType, VType>> action) {
if (mapIterator.hasNext() == false) {
return false;
}
ObjectObjectCursor<KType, VType> cursor = mapIterator.next();
action.accept(new AbstractMap.SimpleImmutableEntry<>(cursor.key, cursor.value));
return true;
}
}, false);
}

@Override
public String toString() {
return map.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.gateway.AsyncShardFetch.Lister;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -42,7 +42,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class GatewayAllocator implements ExistingShardsAllocator {

Expand Down Expand Up @@ -168,8 +167,8 @@ public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting
private void ensureAsyncFetchStorePrimaryRecency(RoutingAllocation allocation) {
DiscoveryNodes nodes = allocation.nodes();
if (hasNewNodes(nodes)) {
final Set<String> newEphemeralIds = StreamSupport.stream(nodes.getDataNodes().spliterator(), false)
.map(node -> node.value.getEphemeralId()).collect(Collectors.toSet());
final Set<String> newEphemeralIds = nodes.getDataNodes().stream()
.map(node -> node.getValue().getEphemeralId()).collect(Collectors.toSet());
// Invalidate the cache if a data node has been added to the cluster. This ensures that we do not cancel a recovery if a node
// drops out, we fetch the shard data, then some indexing happens and then the node rejoins the cluster again. There are other
// ways we could decide to cancel a recovery based on stale data (e.g. changing allocation filters or a primary failure) but
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ private GroupedActionListener<ActionResponse> createGroupedListener(final RestRe
public void onResponse(final Collection<ActionResponse> responses) {
try {
GetSettingsResponse settingsResponse = extractResponse(responses, GetSettingsResponse.class);
Map<String, Settings> indicesSettings = StreamSupport.stream(settingsResponse.getIndexToSettings().spliterator(), false)
.collect(Collectors.toMap(cursor -> cursor.key, cursor -> cursor.value));
Map<String, Settings> indicesSettings = settingsResponse.getIndexToSettings().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

ClusterStateResponse stateResponse = extractResponse(responses, ClusterStateResponse.class);
Map<String, IndexMetadata> indicesStates =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.collect;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.common.Randomness;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;

public class ImmutableOpenMapTests extends ESTestCase {

ImmutableOpenMap<String, String> regionCurrencySymbols = ImmutableOpenMap.<String, String>builder()
.fPut("Japan", "¥")
.fPut("USA", "$")
.fPut("EU", "€")
.fPut("UK", "£")
.fPut("Korea", "₩")
.build();

public void testStreamOperationsAreSupported() {
assertThat(regionCurrencySymbols.stream().filter(e -> e.getKey().startsWith("U")).map(Map.Entry::getValue)
.collect(Collectors.toSet()), equalTo(Set.of("£", "$")));
}

public void testSortedStream() {
assertThat(regionCurrencySymbols.stream().sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue).collect(Collectors.toList()),
equalTo(List.of("€", "¥", "₩", "£", "$")));
}

public void testStreamOperationsOnRandomMap() {
ImmutableOpenMap<Long, String> map = Randomness.get().longs(randomIntBetween(1, 1000))
.mapToObj(e -> Tuple.tuple(e, randomAlphaOfLength(8)))
.collect(() -> ImmutableOpenMap.<Long, String>builder(), (builder, t) -> builder.fPut(t.v1(), t.v2()),
ImmutableOpenMap.Builder::putAll)
.build();

int limit = randomIntBetween(0, map.size());
Map<Long, List<String>> collectedViaStreams = map.stream()
.filter(e -> e.getKey() > 0)
.sorted(Map.Entry.comparingByKey())
.limit(limit)
.collect(Collectors.groupingBy(e -> e.getKey() % 2, Collectors.mapping(Map.Entry::getValue, Collectors.toList())));

Map<Long, String> sortedMap = new TreeMap<>();
for (ObjectObjectCursor<Long, String> cursor : map) {
if (cursor.key > 0) {
sortedMap.put(cursor.key, cursor.value);
}
}
int i = 0;
Map<Long, List<String>> collectedIteratively = new HashMap<>();
for (Map.Entry<Long, String> e : sortedMap.entrySet()) {
if (i++ >= limit) {
break;
}
collectedIteratively.computeIfAbsent(e.getKey() % 2, k -> new ArrayList<>()).add(e.getValue());
}

assertThat(collectedViaStreams, equalTo(collectedIteratively));
}

public void testEmptyStreamWorks() {
assertThat(ImmutableOpenMap.of().stream().count(), equalTo(0L));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a randomized test too, generating the map input of varying sizes and verifying that streaming though it works and produces the right results. Perhaps also randomly make it sort and/or parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add a couple of random tests and check if something gets broken!

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -214,10 +213,13 @@ public void testTimestampFieldTypeExposedByAllIndicesServices() throws Exception

public void testRetryPointInTime() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(1);
final List<String> dataNodes = StreamSupport.stream(
internalCluster().clusterService().state().nodes().getDataNodes().spliterator(),
false
).map(e -> e.value.getName()).collect(Collectors.toList());
final List<String> dataNodes = internalCluster().clusterService()
.state()
.nodes()
.getDataNodes()
.stream()
.map(e -> e.getValue().getName())
.collect(Collectors.toList());
final String assignedNode = randomFrom(dataNodes);
final String indexName = "test";
assertAcked(
Expand Down
Loading