Skip to content

Commit ef8b961

Browse files
committed
Initial impl of intra segment concurrency
- Added LeafReaderContextPartition abstraction that holds LeafReaderContext + the range of doc ids it targets. A slice points now to specific subsets of segments, identified by their corresponding range of doc ids. - Introduced additional IndexSearcher#search method that takes LeafReaderContextPartition[] instead of LeafReaderContext[], which calls `scorer.score(leafCollector, ctx.reader().getLiveDocs(), slice.minDocId, slice.maxDocId);` providing the range of doc ids in place of `scorer.score(leafCollector, ctx.reader().getLiveDocs());` that would score all documents - Added override for new protected IndexSearcher#search to subclasses that require it - Adjusted TotalHitCountCollectorManager to hold state used to track seen leaves and ensure consistent treatment for different partitions of the same leaf - Updated IndexSearcher#getSlices to partition segments that go over maxDocsPerSlice into multiple partitions: this first iteration does not mix up partitions with entire segments in the same slice. Problems found: 1) IndexSearcher#count / TotalHitCountCollector rely on `Weight#count(LeafReaderContext)`, which now gets called multiple times against the same leaf and leads to excessive counting of hits. 2) LRUQueryCache caches the return value of `Weight#count`. When we execute the same query against the same segment multiple times (as part of a single search call), the first time we do the actual counting for the docs that the first partition holds, and subsequent times we should do the same, count hits in each partition of the same segment instead of retrieving the count from the cache. 3) CheckHits verifies matches and may go outside of the bounds of the doc id range of the current slice Both 1) and 2) have been addressed by tracking which seen leaves in TotalHitCountCollectorManager, and extending the TotalHitCountCollector that newCollector returns according to such internal state
1 parent cc3b412 commit ef8b961

File tree

15 files changed

+495
-131
lines changed

15 files changed

+495
-131
lines changed

lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java

Lines changed: 187 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import java.util.Arrays;
2222
import java.util.Collections;
2323
import java.util.Comparator;
24+
import java.util.HashSet;
2425
import java.util.List;
2526
import java.util.Objects;
27+
import java.util.Set;
2628
import java.util.concurrent.Callable;
2729
import java.util.concurrent.Executor;
2830
import java.util.function.Function;
@@ -233,7 +235,13 @@ public IndexSearcher(IndexReaderContext context, Executor executor) {
233235
? leaves ->
234236
leaves.isEmpty()
235237
? new LeafSlice[0]
236-
: new LeafSlice[] {new LeafSlice(new ArrayList<>(leaves))}
238+
: new LeafSlice[] {
239+
new LeafSlice(
240+
new ArrayList<>(
241+
leaves.stream()
242+
.map(LeafReaderContextPartition::createForEntireSegment)
243+
.toList()))
244+
}
237245
: this::slices;
238246
leafSlicesSupplier = new CachingLeafSlicesSupplier(slicesProvider, leafContexts);
239247
}
@@ -328,42 +336,65 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
328336
/** Static method to segregate LeafReaderContexts amongst multiple slices */
329337
public static LeafSlice[] slices(
330338
List<LeafReaderContext> leaves, int maxDocsPerSlice, int maxSegmentsPerSlice) {
339+
340+
// TODO this is a temporary hack to force testing against multiple leaf reader context slices.
341+
// It must be reverted before merging.
342+
// maxDocsPerSlice = 1;
343+
// maxSegmentsPerSlice = 1;
344+
// end hack
345+
331346
// Make a copy so we can sort:
332347
List<LeafReaderContext> sortedLeaves = new ArrayList<>(leaves);
333348

334349
// Sort by maxDoc, descending:
335-
Collections.sort(
336-
sortedLeaves, Collections.reverseOrder(Comparator.comparingInt(l -> l.reader().maxDoc())));
350+
sortedLeaves.sort(Collections.reverseOrder(Comparator.comparingInt(l -> l.reader().maxDoc())));
337351

338-
final List<List<LeafReaderContext>> groupedLeaves = new ArrayList<>();
339-
long docSum = 0;
340-
List<LeafReaderContext> group = null;
352+
final List<List<LeafReaderContextPartition>> groupedLeafPartitions = new ArrayList<>();
353+
int currentSliceNumDocs = 0;
354+
List<LeafReaderContextPartition> group = null;
341355
for (LeafReaderContext ctx : sortedLeaves) {
342356
if (ctx.reader().maxDoc() > maxDocsPerSlice) {
343357
assert group == null;
344-
groupedLeaves.add(Collections.singletonList(ctx));
358+
// if the segment does not fit in a single slice, we split it in multiple partitions of
359+
// equal size
360+
int numSlices = Math.ceilDiv(ctx.reader().maxDoc(), maxDocsPerSlice);
361+
int numDocs = ctx.reader().maxDoc() / numSlices;
362+
int maxDocId = numDocs;
363+
int minDocId = 0;
364+
for (int i = 0; i < numSlices - 1; i++) {
365+
groupedLeafPartitions.add(
366+
Collections.singletonList(
367+
LeafReaderContextPartition.createFromAndTo(ctx, minDocId, maxDocId)));
368+
minDocId = maxDocId;
369+
maxDocId += numDocs;
370+
}
371+
// the last slice gets all the remaining docs
372+
groupedLeafPartitions.add(
373+
Collections.singletonList(LeafReaderContextPartition.createFrom(ctx, minDocId)));
345374
} else {
346375
if (group == null) {
347376
group = new ArrayList<>();
348-
group.add(ctx);
349-
350-
groupedLeaves.add(group);
351-
} else {
352-
group.add(ctx);
377+
groupedLeafPartitions.add(group);
353378
}
354-
355-
docSum += ctx.reader().maxDoc();
356-
if (group.size() >= maxSegmentsPerSlice || docSum > maxDocsPerSlice) {
379+
group.add(LeafReaderContextPartition.createForEntireSegment(ctx));
380+
381+
currentSliceNumDocs += ctx.reader().maxDoc();
382+
// We only split a segment when it does not fit entirely in a slice. We don't partition the
383+
// segment that makes the current slice (which holds multiple segments) go over
384+
// maxDocsPerSlice.
385+
// This means that a slice either contains multiple entire segments, or a single partition
386+
// of a segment.
387+
if (group.size() >= maxSegmentsPerSlice || currentSliceNumDocs > maxDocsPerSlice) {
357388
group = null;
358-
docSum = 0;
389+
currentSliceNumDocs = 0;
359390
}
360391
}
361392
}
362393

363-
LeafSlice[] slices = new LeafSlice[groupedLeaves.size()];
394+
LeafSlice[] slices = new LeafSlice[groupedLeafPartitions.size()];
364395
int upto = 0;
365-
for (List<LeafReaderContext> currentLeaf : groupedLeaves) {
366-
slices[upto] = new LeafSlice(currentLeaf);
396+
for (List<LeafReaderContextPartition> currentGroup : groupedLeafPartitions) {
397+
slices[upto] = new LeafSlice(currentGroup);
367398
++upto;
368399
}
369400

@@ -658,11 +689,11 @@ private <C extends Collector, T> T search(
658689
}
659690
final List<Callable<C>> listTasks = new ArrayList<>(leafSlices.length);
660691
for (int i = 0; i < leafSlices.length; ++i) {
661-
final LeafReaderContext[] leaves = leafSlices[i].leaves;
692+
final LeafReaderContextPartition[] leaves = leafSlices[i].leaves;
662693
final C collector = collectors.get(i);
663694
listTasks.add(
664695
() -> {
665-
search(Arrays.asList(leaves), weight, collector);
696+
search(leaves, weight, collector);
666697
return collector;
667698
});
668699
}
@@ -671,6 +702,52 @@ private <C extends Collector, T> T search(
671702
}
672703
}
673704

705+
// TODO this is a copy of the existing search protected method that takes a list of leaf reader
706+
// contexts
707+
// perhaps more methods need to be switched to use it in place of the other one?
708+
protected void search(LeafReaderContextPartition[] leaves, Weight weight, Collector collector)
709+
throws IOException {
710+
711+
collector.setWeight(weight);
712+
713+
for (LeafReaderContextPartition slice : leaves) { // search each subreader partition
714+
LeafReaderContext ctx = slice.ctx;
715+
final LeafCollector leafCollector;
716+
try {
717+
leafCollector = collector.getLeafCollector(ctx);
718+
} catch (
719+
@SuppressWarnings("unused")
720+
CollectionTerminatedException e) {
721+
// there is no doc of interest in this reader context
722+
// continue with the following leaf
723+
continue;
724+
}
725+
726+
// TODO we are pulling bulk scorer twice for the same ctx. will that work?
727+
BulkScorer scorer = weight.bulkScorer(ctx);
728+
if (scorer != null) {
729+
if (queryTimeout != null) {
730+
scorer = new TimeLimitingBulkScorer(scorer, queryTimeout);
731+
}
732+
try {
733+
scorer.score(leafCollector, ctx.reader().getLiveDocs(), slice.minDocId, slice.maxDocId);
734+
} catch (
735+
@SuppressWarnings("unused")
736+
CollectionTerminatedException e) {
737+
// collection was terminated prematurely
738+
// continue with the following leaf
739+
} catch (
740+
@SuppressWarnings("unused")
741+
TimeLimitingBulkScorer.TimeExceededException e) {
742+
partialResult = true;
743+
}
744+
}
745+
// Note: this is called if collection ran successfully, including the above special cases of
746+
// CollectionTerminatedException and TimeExceededException, but no other exception.
747+
leafCollector.finish();
748+
}
749+
}
750+
674751
/**
675752
* Lower-level search API.
676753
*
@@ -685,6 +762,9 @@ private <C extends Collector, T> T search(
685762
* @throws TooManyClauses If a query would exceed {@link IndexSearcher#getMaxClauseCount()}
686763
* clauses.
687764
*/
765+
766+
// TODO legacy search method that does not support intra segment concurrency - what do we do about
767+
// it?
688768
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector)
689769
throws IOException {
690770

@@ -873,11 +953,70 @@ public static class LeafSlice {
873953
*
874954
* @lucene.experimental
875955
*/
876-
public final LeafReaderContext[] leaves;
956+
public final LeafReaderContextPartition[] leaves;
957+
958+
public LeafSlice(List<LeafReaderContextPartition> leafReaderContextPartitions) {
959+
leafReaderContextPartitions.sort(Comparator.comparingInt(l -> l.ctx.docBase));
960+
// TODO should we sort by minDocId too?
961+
this.leaves = leafReaderContextPartitions.toArray(new LeafReaderContextPartition[0]);
962+
}
877963

878-
public LeafSlice(List<LeafReaderContext> leavesList) {
879-
Collections.sort(leavesList, Comparator.comparingInt(l -> l.docBase));
880-
this.leaves = leavesList.toArray(new LeafReaderContext[0]);
964+
/**
965+
* Returns the total number of docs that a slice targets, by summing the number of docs that
966+
* each of its leaf context partitions targets.
967+
*/
968+
public int getNumDocs() {
969+
return Arrays.stream(leaves)
970+
.map(LeafReaderContextPartition::getNumDocs)
971+
.reduce(Integer::sum)
972+
.get();
973+
}
974+
}
975+
976+
/**
977+
* Holds information about a specific leaf context and the corresponding range of doc ids to
978+
* search within.
979+
*
980+
* @lucene.experimental
981+
*/
982+
public static final class LeafReaderContextPartition {
983+
private final int minDocId;
984+
private final int maxDocId;
985+
private final int numDocs;
986+
public final LeafReaderContext ctx;
987+
988+
private LeafReaderContextPartition(
989+
LeafReaderContext leafReaderContext, int minDocId, int maxDocId, int numDocs) {
990+
this.ctx = leafReaderContext;
991+
this.minDocId = minDocId;
992+
this.maxDocId = maxDocId;
993+
this.numDocs = numDocs;
994+
}
995+
996+
/** Returns The number of docs that the doc id range of this partition targets */
997+
public int getNumDocs() {
998+
return numDocs;
999+
}
1000+
1001+
/** Creates a partition of the provided leaf context that targets the entire segment */
1002+
public static LeafReaderContextPartition createForEntireSegment(LeafReaderContext ctx) {
1003+
return new LeafReaderContextPartition(
1004+
ctx, 0, DocIdSetIterator.NO_MORE_DOCS, ctx.reader().maxDoc());
1005+
}
1006+
1007+
/**
1008+
* Creates a partition of the provided leaf context that targets a subset of the entire segment,
1009+
* starting from the min doc id provided, until the end of the segment
1010+
*/
1011+
public static LeafReaderContextPartition createFrom(LeafReaderContext ctx, int minDocId) {
1012+
return new LeafReaderContextPartition(
1013+
ctx, minDocId, DocIdSetIterator.NO_MORE_DOCS, ctx.reader().maxDoc() - minDocId);
1014+
}
1015+
1016+
public static LeafReaderContextPartition createFromAndTo(
1017+
LeafReaderContext ctx, int minDocId, int maxDocId) {
1018+
assert maxDocId != DocIdSetIterator.NO_MORE_DOCS;
1019+
return new LeafReaderContextPartition(ctx, minDocId, maxDocId, maxDocId - minDocId);
8811020
}
8821021
}
8831022

@@ -1007,10 +1146,33 @@ public LeafSlice[] get() {
10071146
leafSlices =
10081147
Objects.requireNonNull(
10091148
sliceProvider.apply(leaves), "slices computed by the provider is null");
1149+
checkSlices(leafSlices);
10101150
}
10111151
}
10121152
}
10131153
return leafSlices;
10141154
}
1155+
1156+
/**
1157+
* Enforce that there aren't multiple slices pointing to the same physical segment. It is a
1158+
* requirement that {@link Collector#getLeafCollector(LeafReaderContext)} gets called once per
1159+
* leaf context. Also, it does not make sense to partition a segment to then search those
1160+
* partitions as part of the same slice, because the goal of partitioning is parallel searching
1161+
* which happens at the slices level.
1162+
*/
1163+
private static void checkSlices(LeafSlice[] leafSlices) {
1164+
for (LeafSlice leafSlice : leafSlices) {
1165+
Set<LeafReaderContext> distinctLeaves = new HashSet<>();
1166+
for (LeafReaderContextPartition leafPartition : leafSlice.leaves) {
1167+
distinctLeaves.add(leafPartition.ctx);
1168+
}
1169+
if (leafSlice.leaves.length != distinctLeaves.size()) {
1170+
throw new IllegalStateException(
1171+
"The same slice targets multiple partitions of the same leaf reader. "
1172+
+ "A segment should rather get partitioned to be searched concurrently from as many slices as the "
1173+
+ "number of partitions it is split into.");
1174+
}
1175+
}
1176+
}
10151177
}
10161178
}

lucene/core/src/java/org/apache/lucene/search/PointRangeQuery.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,9 @@ public long cost() {
362362
final IntersectVisitor visitor = getIntersectVisitor(result);
363363
long cost = -1;
364364

365+
// maybe allowing calling get multiple times, or clone the scorer to avoid duplication
366+
// across multiple
367+
// slices that point to the same segment
365368
@Override
366369
public Scorer get(long leadCost) throws IOException {
367370
if (values.getDocCount() == reader.maxDoc()
@@ -373,6 +376,7 @@ && cost() > reader.maxDoc() / 2) {
373376
final FixedBitSet result = new FixedBitSet(reader.maxDoc());
374377
result.set(0, reader.maxDoc());
375378
long[] cost = new long[] {reader.maxDoc()};
379+
// bitset etc.
376380
values.intersect(getInverseIntersectVisitor(result, cost));
377381
final DocIdSetIterator iterator = new BitSetIterator(result, cost[0]);
378382
return new ConstantScoreScorer(score(), scoreMode, iterator);

lucene/core/src/java/org/apache/lucene/search/TotalHitCountCollector.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,17 @@ public LeafCollector getLeafCollector(LeafReaderContext context) throws IOExcept
5050
totalHits += leafCount;
5151
throw new CollectionTerminatedException();
5252
}
53+
return createLeafCollector();
54+
}
55+
56+
protected final LeafCollector createLeafCollector() {
5357
return new LeafCollector() {
5458

5559
@Override
56-
public void setScorer(Scorable scorer) throws IOException {}
60+
public void setScorer(Scorable scorer) {}
5761

5862
@Override
59-
public void collect(int doc) throws IOException {
63+
public void collect(int doc) {
6064
totalHits++;
6165
}
6266

0 commit comments

Comments
 (0)