Skip to content

Commit 9c6683f

Browse files
Return MergeObserver from IndexWriter.forceMergeDeletes() (#14515)
IndexWriter.forceMergeDeletes() now returns MergePolicy.MergeObserver instead of void, allowing applications to monitor merge progress and wait for completion. This enables coordination between merge completion and other application logic, and supports both synchronous (await) and asynchronous (CompletableFuture) waiting patterns. Key capabilities: - Query merge status: hasNewMerges(), numMerges() - Wait synchronously: await(), await(timeout, unit) - Wait asynchronously: awaitAsync() returns CompletableFuture<Void> - Inspect individual merges: getMerge(int) Changes: - Add MergePolicy.MergeObserver nested class - Update IndexWriter.forceMergeDeletes() methods to return MergeObserver - Update RandomIndexWriter to propagate return type - Add comprehensive tests (blocking/non-blocking modes, timeout handling) Backward compatible: existing code that ignores the return value continues to work without modification.
1 parent 912e906 commit 9c6683f

File tree

5 files changed

+442
-15
lines changed

5 files changed

+442
-15
lines changed

lucene/CHANGES.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,12 @@ Improvements
6969

7070
* GITHUB#15124: Use RamUsageEstimator to calculate size for non-accountable queries. (Sagar Upadhyaya)
7171

72+
* GITHUB#14515: IndexWriter.forceMergeDeletes() now returns MergePolicy.MergeObserver,
73+
allowing applications to monitor merge progress, wait for completion (synchronously
74+
via await() or asynchronously via CompletableFuture), and inspect individual merges.
75+
Backward compatible - existing code that ignores the return value works unchanged.
76+
(Salvatore Campagna)
77+
7278
Optimizations
7379
---------------------
7480
* GITHUB#14011: Reduce allocation rate in HNSW concurrent merge. (Viliam Durina)

lucene/core/src/java/org/apache/lucene/index/IndexWriter.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2221,8 +2221,10 @@ private synchronized boolean maxNumSegmentsMergesPending() {
22212221
* Just like {@link #forceMergeDeletes()}, except you can specify whether the call should block
22222222
* until the operation completes. This is only meaningful with a {@link MergeScheduler} that is
22232223
* able to run merges in background threads.
2224+
*
2225+
* @return a {@link MergePolicy.MergeObserver} to monitor merge progress and wait for completion
22242226
*/
2225-
public void forceMergeDeletes(boolean doWait) throws IOException {
2227+
public MergePolicy.MergeObserver forceMergeDeletes(boolean doWait) throws IOException {
22262228
ensureOpen();
22272229

22282230
flush(true, true);
@@ -2234,20 +2236,20 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
22342236
final MergePolicy mergePolicy = config.getMergePolicy();
22352237
final CachingMergeContext cachingMergeContext = new CachingMergeContext(this);
22362238
MergePolicy.MergeSpecification spec;
2237-
boolean newMergesFound = false;
2239+
MergePolicy.MergeObserver observer;
22382240
synchronized (this) {
22392241
spec = mergePolicy.findForcedDeletesMerges(segmentInfos, cachingMergeContext);
2240-
newMergesFound = spec != null;
2241-
if (newMergesFound) {
2242-
final int numMerges = spec.merges.size();
2243-
for (int i = 0; i < numMerges; i++) registerMerge(spec.merges.get(i));
2242+
observer = new MergePolicy.MergeObserver(spec);
2243+
if (observer.hasNewMerges()) {
2244+
final int numMerges = observer.numMerges();
2245+
for (int i = 0; i < numMerges; i++) registerMerge(observer.getMerge(i));
22442246
}
22452247
}
22462248

22472249
mergeScheduler.merge(mergeSource, MergeTrigger.EXPLICIT);
22482250

2249-
if (spec != null && doWait) {
2250-
final int numMerges = spec.merges.size();
2251+
if (observer.hasNewMerges() && doWait) {
2252+
final int numMerges = observer.numMerges();
22512253
synchronized (this) {
22522254
boolean running = true;
22532255
while (running) {
@@ -2263,7 +2265,7 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
22632265
// if any of them have hit an exception.
22642266
running = false;
22652267
for (int i = 0; i < numMerges; i++) {
2266-
final MergePolicy.OneMerge merge = spec.merges.get(i);
2268+
final MergePolicy.OneMerge merge = observer.getMerge(i);
22672269
if (pendingMerges.contains(merge) || runningMerges.contains(merge)) {
22682270
running = true;
22692271
}
@@ -2282,6 +2284,7 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
22822284
// NOTE: in the ConcurrentMergeScheduler case, when
22832285
// doWait is false, we can return immediately while
22842286
// background threads accomplish the merging
2287+
return observer;
22852288
}
22862289

22872290
/**
@@ -2296,9 +2299,12 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
22962299
*
22972300
* <p><b>NOTE</b>: this method first flushes a new segment (if there are indexed documents), and
22982301
* applies all buffered deletes.
2302+
*
2303+
* @return a {@link MergePolicy.MergeObserver} to monitor merge progress. Since this method blocks
2304+
* until completion, merges will already be complete when it returns.
22992305
*/
2300-
public void forceMergeDeletes() throws IOException {
2301-
forceMergeDeletes(true);
2306+
public MergePolicy.MergeObserver forceMergeDeletes() throws IOException {
2307+
return forceMergeDeletes(true);
23022308
}
23032309

23042310
/**

lucene/core/src/java/org/apache/lucene/index/MergePolicy.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -939,4 +939,96 @@ static final class MergeReader {
939939
this.hardLiveDocs = hardLiveDocs;
940940
}
941941
}
942+
943+
/**
944+
* Observer for merge operations returned by {@link IndexWriter#forceMergeDeletes(boolean)}.
945+
* Provides methods to query merge status and wait for completion.
946+
*
947+
* <p>When no merges are needed, {@link #hasNewMerges()} returns {@code false} and {@link
948+
* #numMerges()} returns 0. In this case, {@link #await()} returns {@code true} immediately since
949+
* there is nothing to wait for.
950+
*
951+
* @lucene.experimental
952+
*/
953+
public static final class MergeObserver {
954+
private final MergePolicy.MergeSpecification spec;
955+
956+
MergeObserver(MergePolicy.MergeSpecification spec) {
957+
this.spec = spec;
958+
}
959+
960+
/**
961+
* Returns the number of merges in this specification.
962+
*
963+
* @return number of merges, or 0 if no merges were scheduled
964+
*/
965+
public int numMerges() {
966+
return spec == null ? 0 : spec.merges.size();
967+
}
968+
969+
/**
970+
* Returns whether any new merges were scheduled.
971+
*
972+
* @return {@code true} if merges were scheduled, {@code false} if no merges needed
973+
*/
974+
public boolean hasNewMerges() {
975+
return spec != null;
976+
}
977+
978+
/**
979+
* Waits for all merges in this specification to complete. Returns immediately if no merges were
980+
* scheduled.
981+
*
982+
* @return {@code true} if all merges completed successfully or no merges were needed, {@code
983+
* false} on error
984+
*/
985+
public boolean await() {
986+
return spec == null || spec.await();
987+
}
988+
989+
/**
990+
* Waits for all merges in this specification to complete, with timeout. Returns immediately if
991+
* no merges were scheduled.
992+
*
993+
* @param timeout maximum time to wait
994+
* @param unit time unit for timeout
995+
* @return {@code true} if all merges completed within timeout or no merges were needed, {@code
996+
* false} on timeout or error
997+
*/
998+
public boolean await(long timeout, TimeUnit unit) {
999+
return spec == null || spec.await(timeout, unit);
1000+
}
1001+
1002+
/**
1003+
* Returns a {@link CompletableFuture} that completes when all merges finish. Returns an
1004+
* already-completed future if no merges were scheduled.
1005+
*
1006+
* @return future that completes when merges finish
1007+
*/
1008+
public CompletableFuture<Void> awaitAsync() {
1009+
return spec == null
1010+
? CompletableFuture.completedFuture(null)
1011+
: spec.getMergeCompletedFutures();
1012+
}
1013+
1014+
@Override
1015+
public String toString() {
1016+
return spec == null ? "MergeObserver: no merges" : spec.toString();
1017+
}
1018+
1019+
/**
1020+
* Returns the merge at the specified index. Caller must ensure {@link #hasNewMerges()} returns
1021+
* {@code true} and index is within bounds.
1022+
*
1023+
* @param i merge index (0 to {@link #numMerges()} - 1)
1024+
* @return the merge at index i
1025+
* @throws IndexOutOfBoundsException if index is invalid or no merges exist
1026+
*/
1027+
public MergePolicy.OneMerge getMerge(int i) {
1028+
if (spec == null) {
1029+
throw new IndexOutOfBoundsException("No merges available");
1030+
}
1031+
return spec.merges.get(i);
1032+
}
1033+
}
9421034
}

0 commit comments

Comments
 (0)