Skip to content

Commit 138b72e

Browse files
committed
Add concurrency across segments, reducing total time from 300+ seconds to 100+ seconds
1 parent 09ae809 commit 138b72e

File tree

9 files changed

+185
-65
lines changed

9 files changed

+185
-65
lines changed

lucene/backward-codecs/src/test/org/apache/lucene/backward_index/TestManyPointsInOldIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void testCheckOldIndex() throws IOException {
7070
dir.setCheckIndexOnClose(false);
7171

7272
// ... because we check ourselves here:
73-
TestUtil.checkIndex(dir, false, true, null);
73+
TestUtil.checkIndex(dir, false, true, true, null);
7474
dir.close();
7575
}
7676
}

lucene/core/src/java/org/apache/lucene/index/CheckIndex.java

Lines changed: 159 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
2020

21+
import java.io.ByteArrayOutputStream;
2122
import java.io.Closeable;
2223
import java.io.IOException;
2324
import java.io.PrintStream;
25+
import java.nio.charset.StandardCharsets;
2426
import java.nio.file.Path;
2527
import java.nio.file.Paths;
2628
import java.text.NumberFormat;
@@ -31,9 +33,14 @@
3133
import java.util.List;
3234
import java.util.Locale;
3335
import java.util.Map;
36+
import java.util.concurrent.Callable;
37+
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.CompletionException;
39+
import java.util.concurrent.ExecutionException;
3440
import java.util.concurrent.ExecutorService;
3541
import java.util.concurrent.Executors;
3642
import java.util.concurrent.TimeUnit;
43+
import java.util.function.Supplier;
3744
import org.apache.lucene.codecs.Codec;
3845
import org.apache.lucene.codecs.DocValuesProducer;
3946
import org.apache.lucene.codecs.NormsProducer;
@@ -80,7 +87,6 @@
8087
*/
8188
public final class CheckIndex implements Closeable {
8289

83-
private static final int MAX_PER_SEGMENT_CONCURRENCY = 11;
8490
private PrintStream infoStream;
8591
private Directory dir;
8692
private Lock writeLock;
@@ -521,22 +527,29 @@ public Status checkIndex() throws IOException {
521527
* quite a long time to run.
522528
*/
523529
public Status checkIndex(List<String> onlySegments) throws IOException {
524-
ExecutorService executorService =
525-
Executors.newFixedThreadPool(threadCount, new NamedThreadFactory("async-check-index"));
530+
ExecutorService executorService = null;
531+
532+
if (threadCount > 0) {
533+
executorService =
534+
Executors.newFixedThreadPool(threadCount, new NamedThreadFactory("async-check-index"));
535+
}
536+
526537
msg(infoStream, "Checking index with async threadCount: " + threadCount);
527538
try {
528539
return checkIndex(onlySegments, executorService);
529540
} finally {
530-
executorService.shutdown();
531-
try {
532-
executorService.awaitTermination(5, TimeUnit.SECONDS);
533-
} catch (InterruptedException e) {
534-
msg(
535-
infoStream,
536-
"ERROR: Interrupted exception occurred when shutting down executor service");
537-
if (infoStream != null) e.printStackTrace(infoStream);
538-
} finally {
539-
executorService.shutdownNow();
541+
if (executorService != null) {
542+
executorService.shutdown();
543+
try {
544+
executorService.awaitTermination(5, TimeUnit.SECONDS);
545+
} catch (InterruptedException e) {
546+
msg(
547+
infoStream,
548+
"ERROR: Interrupted exception occurred when shutting down executor service");
549+
if (infoStream != null) e.printStackTrace(infoStream);
550+
} finally {
551+
executorService.shutdownNow();
552+
}
540553
}
541554
}
542555
}
@@ -667,34 +680,101 @@ public Status checkIndex(List<String> onlySegments, ExecutorService executorServ
667680
result.newSegments.clear();
668681
result.maxSegmentName = -1;
669682

670-
for (int i = 0; i < numSegments; i++) {
671-
final SegmentCommitInfo info = sis.info(i);
672-
long segmentName = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX);
673-
if (segmentName > result.maxSegmentName) {
674-
result.maxSegmentName = segmentName;
683+
// checks segments sequentially
684+
if (executorService == null) {
685+
for (int i = 0; i < numSegments; i++) {
686+
final SegmentCommitInfo info = sis.info(i);
687+
updateMaxSegmentName(result, info);
688+
if (onlySegments != null && !onlySegments.contains(info.info.name)) {
689+
continue;
690+
}
691+
692+
msg(
693+
infoStream,
694+
(1 + i)
695+
+ " of "
696+
+ numSegments
697+
+ ": name="
698+
+ info.info.name
699+
+ " maxDoc="
700+
+ info.info.maxDoc());
701+
Status.SegmentInfoStatus segmentInfoStatus = testSegment(sis, info, infoStream);
702+
703+
processSegmentInfoStatusResult(result, info, segmentInfoStatus);
675704
}
676-
if (onlySegments != null && !onlySegments.contains(info.info.name)) {
677-
continue;
705+
} else {
706+
List<ByteArrayOutputStream> outputs = new ArrayList<>();
707+
List<CompletableFuture<Status.SegmentInfoStatus>> futures = new ArrayList<>();
708+
709+
// checks segments concurrently
710+
for (int i = 0; i < numSegments; i++) {
711+
final SegmentCommitInfo info = sis.info(i);
712+
updateMaxSegmentName(result, info);
713+
if (onlySegments != null && !onlySegments.contains(info.info.name)) {
714+
continue;
715+
}
716+
717+
SegmentInfos finalSis = sis;
718+
719+
ByteArrayOutputStream output = new ByteArrayOutputStream();
720+
PrintStream stream;
721+
if (i > 0) {
722+
// buffer the messages for segment starting from the 2nd one so that they can later be
723+
// printed in order
724+
stream = new PrintStream(output, true, IOUtils.UTF_8);
725+
} else {
726+
// optimize for first segment to print real-time
727+
stream = infoStream;
728+
}
729+
msg(
730+
stream,
731+
(1 + i)
732+
+ " of "
733+
+ numSegments
734+
+ ": name="
735+
+ info.info.name
736+
+ " maxDoc="
737+
+ info.info.maxDoc());
738+
739+
outputs.add(output);
740+
futures.add(
741+
runAsyncSegmentCheck(() -> testSegment(finalSis, info, stream), executorService));
678742
}
679-
msg(
680-
infoStream,
681-
(1 + i)
682-
+ " of "
683-
+ numSegments
684-
+ ": name="
685-
+ info.info.name
686-
+ " maxDoc="
687-
+ info.info.maxDoc());
688-
689-
Status.SegmentInfoStatus segmentInfoStatus = testSegment(sis, info, infoStream);
690-
691-
result.segmentInfos.add(segmentInfoStatus);
692-
if (segmentInfoStatus.error != null) {
693-
result.totLoseDocCount += segmentInfoStatus.toLoseDocCount;
694-
result.numBadSegments++;
695-
} else {
696-
// Keeper
697-
result.newSegments.add(info.clone());
743+
744+
for (int i = 0; i < numSegments; i++) {
745+
SegmentCommitInfo info = sis.info(i);
746+
if (onlySegments != null && !onlySegments.contains(info.info.name)) {
747+
continue;
748+
}
749+
750+
ByteArrayOutputStream output = outputs.get(i);
751+
752+
// print segment results in order
753+
Status.SegmentInfoStatus segmentInfoStatus = null;
754+
try {
755+
segmentInfoStatus = futures.get(i).get();
756+
} catch (InterruptedException e) {
757+
// the segment test output should come before interrupted exception message that follows,
758+
// hence it's not emitted from finally clause
759+
infoStream.println(output.toString(StandardCharsets.UTF_8).stripTrailing());
760+
msg(
761+
infoStream,
762+
"ERROR: Interrupted exception occurred when getting segment check result for segment "
763+
+ info.info.name);
764+
if (infoStream != null) e.printStackTrace(infoStream);
765+
} catch (ExecutionException e) {
766+
infoStream.println(output.toString(StandardCharsets.UTF_8).stripTrailing());
767+
768+
assert failFast;
769+
throw new CheckIndexException("Segment " + info.info.name + " check failed.", e);
770+
}
771+
772+
if (i > 0) {
773+
// first segment output already printed by infoStream
774+
infoStream.println(output.toString(StandardCharsets.UTF_8).stripTrailing());
775+
}
776+
777+
processSegmentInfoStatusResult(result, info, segmentInfoStatus);
698778
}
699779
}
700780

@@ -731,6 +811,42 @@ public Status checkIndex(List<String> onlySegments, ExecutorService executorServ
731811
return result;
732812
}
733813

814+
private void updateMaxSegmentName(Status result, SegmentCommitInfo info) {
815+
long segmentName = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX);
816+
if (segmentName > result.maxSegmentName) {
817+
result.maxSegmentName = segmentName;
818+
}
819+
}
820+
821+
private void processSegmentInfoStatusResult(
822+
Status result, SegmentCommitInfo info, Status.SegmentInfoStatus segmentInfoStatus) {
823+
result.segmentInfos.add(segmentInfoStatus);
824+
if (segmentInfoStatus.error != null) {
825+
result.totLoseDocCount += segmentInfoStatus.toLoseDocCount;
826+
result.numBadSegments++;
827+
} else {
828+
// Keeper
829+
result.newSegments.add(info.clone());
830+
}
831+
}
832+
833+
private <R> CompletableFuture<R> runAsyncSegmentCheck(
834+
Callable<R> asyncCallable, ExecutorService executorService) {
835+
return CompletableFuture.supplyAsync(callableToSupplier(asyncCallable), executorService);
836+
}
837+
838+
private <T> Supplier<T> callableToSupplier(Callable<T> callable) {
839+
return () -> {
840+
try {
841+
return callable.call();
842+
} catch (RuntimeException | Error e) {
843+
throw e;
844+
} catch (Throwable e) {
845+
throw new CompletionException(e);
846+
}
847+
};
848+
}
849+
734850
private Status.SegmentInfoStatus testSegment(
735851
SegmentInfos sis, SegmentCommitInfo info, PrintStream infoStream) throws IOException {
736852
Status.SegmentInfoStatus segInfoStat = new Status.SegmentInfoStatus();
@@ -923,8 +1039,6 @@ private Status.SegmentInfoStatus testSegment(
9231039
"Soft Deletes test failed", segInfoStat.softDeletesStatus.error);
9241040
}
9251041
}
926-
927-
msg(infoStream, "");
9281042
} catch (Throwable t) {
9291043
if (failFast) {
9301044
throw IOUtils.rethrowAlways(t);
@@ -3830,15 +3944,7 @@ public static Options parseOptions(String[] args) {
38303944
throw new IllegalArgumentException("-threadCount requires a following number");
38313945
}
38323946
i++;
3833-
int providedThreadCount = Integer.parseInt(args[i]);
3834-
// Current implementation supports up to 11 concurrent checks at any time, and no
3835-
// concurrency across segments.
3836-
// Capping the thread count to 11 to avoid unnecessary threads to be created.
3837-
if (providedThreadCount > MAX_PER_SEGMENT_CONCURRENCY) {
3838-
System.out.println(
3839-
"-threadCount currently only supports up to 11 threads. Value higher than that will be capped.");
3840-
}
3841-
opts.threadCount = Math.min(providedThreadCount, MAX_PER_SEGMENT_CONCURRENCY);
3947+
opts.threadCount = Integer.parseInt(args[i]);
38423948
} else {
38433949
if (opts.indexPath != null) {
38443950
throw new IllegalArgumentException("ERROR: unexpected extra argument '" + args[i] + "'");
@@ -3906,10 +4012,9 @@ public int doCheck(Options opts) throws IOException, InterruptedException {
39064012
setDoSlowChecks(opts.doSlowChecks);
39074013
setChecksumsOnly(opts.doChecksumsOnly);
39084014
setInfoStream(opts.out, opts.verbose);
3909-
// when threadCount was not provided via command line, don't overwrite the default
3910-
if (opts.threadCount > 0) {
3911-
setThreadCount(opts.threadCount);
3912-
}
4015+
// when threadCount was not provided via command line, override it with 0 to turn of concurrent
4016+
// check
4017+
setThreadCount(opts.threadCount);
39134018

39144019
Status result = checkIndex(opts.onlySegments);
39154020

lucene/core/src/test/org/apache/lucene/index/TestAllFilesDetectTruncation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private void truncateOneFile(Directory dir, String victim) throws IOException {
121121
// CheckIndex should also fail:
122122
expectThrowsAnyOf(
123123
Arrays.asList(CorruptIndexException.class, EOFException.class),
124-
() -> TestUtil.checkIndex(dirCopy, true, true, null));
124+
() -> TestUtil.checkIndex(dirCopy, true, true, true, null));
125125
}
126126
}
127127
}

lucene/core/src/test/org/apache/lucene/index/TestCheckIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public void testCheckIndexAllValid() throws Exception {
122122
}
123123

124124
ByteArrayOutputStream output = new ByteArrayOutputStream();
125-
CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, output);
125+
CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, true, output);
126126

127127
assertEquals(1, status.segmentInfos.size());
128128

lucene/core/src/test/org/apache/lucene/index/TestPointValues.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,7 @@ public void testCheckIndexIncludesPoints() throws Exception {
726726
w.close();
727727

728728
ByteArrayOutputStream output = new ByteArrayOutputStream();
729-
CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, output);
729+
CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, true, output);
730730
assertEquals(1, status.segmentInfos.size());
731731
CheckIndex.Status.SegmentInfoStatus segStatus = status.segmentInfos.get(0);
732732
// total 3 point values were index:

lucene/core/src/test/org/apache/lucene/index/TestSwappedIndexFiles.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,11 @@ private void swapOneFile(Directory dir1, Directory dir2, String victim) throws I
116116
// CheckIndex should also fail:
117117
expectThrowsAnyOf(
118118
Arrays.asList(
119-
CorruptIndexException.class, EOFException.class, IndexFormatTooOldException.class),
120-
() -> TestUtil.checkIndex(dirCopy, true, true, null));
119+
CorruptIndexException.class,
120+
EOFException.class,
121+
IndexFormatTooOldException.class,
122+
CheckIndex.CheckIndexException.class),
123+
() -> TestUtil.checkIndex(dirCopy, true, true, true, null));
121124
}
122125
}
123126
}

lucene/test-framework/src/java/org/apache/lucene/index/BaseVectorFormatTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -941,7 +941,7 @@ public void testCheckIndexIncludesVectors() throws Exception {
941941
}
942942

943943
ByteArrayOutputStream output = new ByteArrayOutputStream();
944-
CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, output);
944+
CheckIndex.Status status = TestUtil.checkIndex(dir, false, true, true, output);
945945
assertEquals(1, status.segmentInfos.size());
946946
CheckIndex.Status.SegmentInfoStatus segStatus = status.segmentInfos.get(0);
947947
// total 3 vector values were indexed:

lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,7 +895,11 @@ public synchronized void close() throws IOException {
895895
System.out.println("\nNOTE: MockDirectoryWrapper: now run CheckIndex");
896896
}
897897

898-
TestUtil.checkIndex(this, getCrossCheckTermVectorsOnClose(), true, null);
898+
// Methods in MockDirectoryWrapper hold locks on this, which will cause deadlock when
899+
// TestUtil#checkIndex checks segment concurrently using another thread, but making
900+
// call back to synchronized methods such as MockDirectoryWrapper#fileLength.
901+
// Hence passing concurrent = false to this method to turn off concurrent checks.
902+
TestUtil.checkIndex(this, getCrossCheckTermVectorsOnClose(), true, false, null);
899903
}
900904

901905
// TODO: factor this out / share w/ TestIW.assertNoUnreferencedFiles

lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -302,15 +302,19 @@ public static CheckIndex.Status checkIndex(Directory dir) throws IOException {
302302

303303
public static CheckIndex.Status checkIndex(Directory dir, boolean doSlowChecks)
304304
throws IOException {
305-
return checkIndex(dir, doSlowChecks, false, null);
305+
return checkIndex(dir, doSlowChecks, false, true, null);
306306
}
307307

308308
/**
309309
* If failFast is true, then throw the first exception when index corruption is hit, instead of
310310
* moving on to other fields/segments to look for any other corruption.
311311
*/
312312
public static CheckIndex.Status checkIndex(
313-
Directory dir, boolean doSlowChecks, boolean failFast, ByteArrayOutputStream output)
313+
Directory dir,
314+
boolean doSlowChecks,
315+
boolean failFast,
316+
boolean concurrent,
317+
ByteArrayOutputStream output)
314318
throws IOException {
315319
if (output == null) {
316320
output = new ByteArrayOutputStream(1024);
@@ -322,7 +326,11 @@ public static CheckIndex.Status checkIndex(
322326
checker.setDoSlowChecks(doSlowChecks);
323327
checker.setFailFast(failFast);
324328
checker.setInfoStream(new PrintStream(output, false, IOUtils.UTF_8), false);
325-
checker.setThreadCount(RandomizedTest.randomIntBetween(1, 5));
329+
if (concurrent) {
330+
checker.setThreadCount(RandomizedTest.randomIntBetween(1, 5));
331+
} else {
332+
checker.setThreadCount(0);
333+
}
326334
CheckIndex.Status indexStatus = checker.checkIndex(null);
327335

328336
if (indexStatus == null || indexStatus.clean == false) {

0 commit comments

Comments
 (0)