Skip to content

Clarify when shard iterators get sorted #52633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
iterators.add(iterator);
}
}
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators, false);
this.shardsIts = new GroupShardsIterator<>(iterators, false);
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators);
this.shardsIts = new GroupShardsIterator<>(iterators);
// we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up
// it's number of active shards but use 1 as the default if no replica of a shard is active at this point.
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private GroupShardsIterator<SearchShardIterator> getIterator(CanMatchSearchPhase
return shardsIts;
}
FieldSortBuilder fieldSort = FieldSortBuilder.getPrimaryFieldSortOrNull(source);
return new GroupShardsIterator<>(sortShards(shardsIts, results.minAndMaxes, fieldSort.order()), false);
return new GroupShardsIterator<>(sortShards(shardsIts, results.minAndMaxes, fieldSort.order()));
}

private static List<SearchShardIterator> sortShards(GroupShardsIterator<SearchShardIterator> shardsIts,
Expand All @@ -122,7 +122,7 @@ private static List<SearchShardIterator> sortShards(GroupShardsIterator<SearchSh
return IntStream.range(0, shardsIts.size())
.boxed()
.sorted(shardComparator(shardsIts, minAndMaxes, order))
.map(ord -> shardsIts.get(ord))
.map(shardsIts::get)
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,10 +553,10 @@ static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShards
for (ShardIterator shardIterator : localShardsIterator) {
shards.add(new SearchShardIterator(localClusterAlias, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
}
return new GroupShardsIterator<>(shards);
return GroupShardsIterator.sortAndCreate(shards);
}

private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest,
private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction(SearchTask task, SearchRequest searchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators,
SearchTimeProvider timeProvider,
BiFunction<String, String, Transport.Connection> connectionLookup,
Expand All @@ -572,8 +572,19 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque
return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task, (iter) -> {
AbstractSearchAsyncAction action = searchAsyncAction(task, searchRequest, iter, timeProvider, connectionLookup,
clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters);
AbstractSearchAsyncAction<? extends SearchPhaseResult> action = searchAsyncAction(
task,
searchRequest,
iter,
timeProvider,
connectionLookup,
clusterStateVersion,
aliasFilter,
concreteIndexBoosts,
indexRoutings,
listener,
false,
clusters);
return new SearchPhase(action.getName()) {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ public final class GroupShardsIterator<ShardIt extends ShardIterator> implements
private final List<ShardIt> iterators;

/**
* Constructs a enw GroupShardsIterator from the given list.
* Constructs a new sorted GroupShardsIterator from the given list. Items are sorted based on their natural ordering.
* @see PlainShardIterator#compareTo(ShardIterator)
* @see org.elasticsearch.action.search.SearchShardIterator#compareTo(ShardIterator)
*/
public GroupShardsIterator(List<ShardIt> iterators) {
this(iterators, true);
public static <ShardIt extends ShardIterator> GroupShardsIterator<ShardIt> sortAndCreate(List<ShardIt> iterators) {
CollectionUtil.timSort(iterators);
return new GroupShardsIterator<>(iterators);
}

/**
* Constructs a new GroupShardsIterator from the given list.
*/
public GroupShardsIterator(List<ShardIt> iterators, boolean useSort) {
if (useSort) {
CollectionUtil.timSort(iterators);
}
public GroupShardsIterator(List<ShardIt> iterators) {
this.iterators = iterators;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public GroupShardsIterator<ShardIterator> searchShards(ClusterState clusterState
set.add(iterator);
}
}
return new GroupShardsIterator<>(new ArrayList<>(set));
return GroupShardsIterator.sortAndCreate(new ArrayList<>(set));
}

private static final Map<String, Set<String>> EMPTY_ROUTING = Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ private GroupShardsIterator<ShardIterator> allSatisfyingPredicateShardsGrouped(S
}
}
}
return new GroupShardsIterator<>(set);
return GroupShardsIterator.sortAndCreate(set);
}

public ShardsIterator allShards(String[] indices) {
Expand Down Expand Up @@ -321,7 +321,7 @@ public GroupShardsIterator<ShardIterator> activePrimaryShardsGrouped(String[] in
}
}
}
return new GroupShardsIterator<>(set);
return GroupShardsIterator.sortAndCreate(set);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testSize() {
ShardId shardId = new ShardId(index, 1);
list.add(new PlainShardIterator(shardId, randomShardRoutings(shardId, 0)));
}
GroupShardsIterator iter = new GroupShardsIterator<>(list);
GroupShardsIterator<ShardIterator> iter = new GroupShardsIterator<>(list);
assertEquals(7, iter.totalSizeWith1ForEmpty());
assertEquals(5, iter.size());
assertEquals(6, iter.totalSize());
Expand Down Expand Up @@ -106,13 +106,24 @@ public void testIterate() {
}

Collections.shuffle(list, random());
List<ShardIterator> actualIterators = new ArrayList<>();
GroupShardsIterator<ShardIterator> iter = new GroupShardsIterator<>(list);
for (ShardIterator shardsIterator : iter) {
actualIterators.add(shardsIterator);
{
GroupShardsIterator<ShardIterator> unsorted = new GroupShardsIterator<>(list);
GroupShardsIterator<ShardIterator> iter = new GroupShardsIterator<>(list);
List<ShardIterator> actualIterators = new ArrayList<>();
for (ShardIterator shardsIterator : iter) {
actualIterators.add(shardsIterator);
}
assertEquals(actualIterators, list);
}
{
GroupShardsIterator<ShardIterator> iter = GroupShardsIterator.sortAndCreate(list);
List<ShardIterator> actualIterators = new ArrayList<>();
for (ShardIterator shardsIterator : iter) {
actualIterators.add(shardsIterator);
}
CollectionUtil.timSort(actualIterators);
assertEquals(actualIterators, list);
}
CollectionUtil.timSort(actualIterators);
assertEquals(actualIterators, list);
}

public void testOrderingWithSearchShardIterators() {
Expand All @@ -123,31 +134,41 @@ public void testOrderingWithSearchShardIterators() {
String[] clusters = generateRandomStringArray(5, 10, false, false);
Arrays.sort(clusters);

List<SearchShardIterator> expected = new ArrayList<>();
List<SearchShardIterator> sorted = new ArrayList<>();
int numShards = randomIntBetween(1, 10);
for (int i = 0; i < numShards; i++) {
for (String index : indices) {
for (String uuid : uuids) {
ShardId shardId = new ShardId(index, uuid, i);
SearchShardIterator shardIterator = new SearchShardIterator(null, shardId,
GroupShardsIteratorTests.randomShardRoutings(shardId), OriginalIndicesTests.randomOriginalIndices());
expected.add(shardIterator);
sorted.add(shardIterator);
for (String cluster : clusters) {
SearchShardIterator remoteIterator = new SearchShardIterator(cluster, shardId,
GroupShardsIteratorTests.randomShardRoutings(shardId), OriginalIndicesTests.randomOriginalIndices());
expected.add(remoteIterator);
sorted.add(remoteIterator);
}
}
}
}

List<SearchShardIterator> shuffled = new ArrayList<>(expected);
List<SearchShardIterator> shuffled = new ArrayList<>(sorted);
Collections.shuffle(shuffled, random());
List<ShardIterator> actualIterators = new ArrayList<>();
GroupShardsIterator<SearchShardIterator> iter = new GroupShardsIterator<>(shuffled);
for (SearchShardIterator searchShardIterator : iter) {
actualIterators.add(searchShardIterator);
{
List<ShardIterator> actualIterators = new ArrayList<>();
GroupShardsIterator<SearchShardIterator> iter = new GroupShardsIterator<>(shuffled);
for (SearchShardIterator searchShardIterator : iter) {
actualIterators.add(searchShardIterator);
}
assertEquals(shuffled, actualIterators);
}
{
List<ShardIterator> actualIterators = new ArrayList<>();
GroupShardsIterator<SearchShardIterator> iter = GroupShardsIterator.sortAndCreate(shuffled);
for (SearchShardIterator searchShardIterator : iter) {
actualIterators.add(searchShardIterator);
}
assertEquals(sorted, actualIterators);
}
assertEquals(expected, actualIterators);
}
}