Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ Improvements

* GITHUB#15124: Use RamUsageEstimator to calculate size for non-accountable queries. (Sagar Upadhyaya)

* GITHUB#14515: IndexWriter.forceMergeDeletes() now returns MergePolicy.MergeObserver,
allowing applications to monitor merge progress, wait for completion (synchronously
via await() or asynchronously via CompletableFuture), and inspect individual merges.
Backward compatible - existing code that ignores the return value works unchanged.
(Salvatore Campagna)

Optimizations
---------------------
* GITHUB#14011: Reduce allocation rate in HNSW concurrent merge. (Viliam Durina)
Expand Down
28 changes: 17 additions & 11 deletions lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2221,8 +2221,10 @@ private synchronized boolean maxNumSegmentsMergesPending() {
* Just like {@link #forceMergeDeletes()}, except you can specify whether the call should block
* until the operation completes. This is only meaningful with a {@link MergeScheduler} that is
* able to run merges in background threads.
*
* @return a {@link MergePolicy.MergeObserver} to monitor merge progress and wait for completion
*/
public void forceMergeDeletes(boolean doWait) throws IOException {
public MergePolicy.MergeObserver forceMergeDeletes(boolean doWait) throws IOException {
ensureOpen();

flush(true, true);
Expand All @@ -2234,20 +2236,20 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
final MergePolicy mergePolicy = config.getMergePolicy();
final CachingMergeContext cachingMergeContext = new CachingMergeContext(this);
MergePolicy.MergeSpecification spec;
boolean newMergesFound = false;
MergePolicy.MergeObserver observer;
synchronized (this) {
spec = mergePolicy.findForcedDeletesMerges(segmentInfos, cachingMergeContext);
newMergesFound = spec != null;
if (newMergesFound) {
final int numMerges = spec.merges.size();
for (int i = 0; i < numMerges; i++) registerMerge(spec.merges.get(i));
observer = new MergePolicy.MergeObserver(spec);
if (observer.hasNewMerges()) {
final int numMerges = observer.numMerges();
for (int i = 0; i < numMerges; i++) registerMerge(observer.getMerge(i));
}
}

mergeScheduler.merge(mergeSource, MergeTrigger.EXPLICIT);

if (spec != null && doWait) {
final int numMerges = spec.merges.size();
if (observer.hasNewMerges() && doWait) {
final int numMerges = observer.numMerges();
synchronized (this) {
boolean running = true;
while (running) {
Expand All @@ -2263,7 +2265,7 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
// if any of them have hit an exception.
running = false;
for (int i = 0; i < numMerges; i++) {
final MergePolicy.OneMerge merge = spec.merges.get(i);
final MergePolicy.OneMerge merge = observer.getMerge(i);
if (pendingMerges.contains(merge) || runningMerges.contains(merge)) {
running = true;
}
Expand All @@ -2282,6 +2284,7 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
// NOTE: in the ConcurrentMergeScheduler case, when
// doWait is false, we can return immediately while
// background threads accomplish the merging
return observer;
}

/**
Expand All @@ -2296,9 +2299,12 @@ public void forceMergeDeletes(boolean doWait) throws IOException {
*
* <p><b>NOTE</b>: this method first flushes a new segment (if there are indexed documents), and
* applies all buffered deletes.
*
* @return a {@link MergePolicy.MergeObserver} to monitor merge progress. Since this method blocks
* until completion, merges will already be complete when it returns.
*/
public void forceMergeDeletes() throws IOException {
forceMergeDeletes(true);
public MergePolicy.MergeObserver forceMergeDeletes() throws IOException {
return forceMergeDeletes(true);
}

/**
Expand Down
92 changes: 92 additions & 0 deletions lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -939,4 +939,96 @@ static final class MergeReader {
this.hardLiveDocs = hardLiveDocs;
}
}

/**
* Observer for merge operations returned by {@link IndexWriter#forceMergeDeletes(boolean)}.
* Provides methods to query merge status and wait for completion.
*
* <p>When no merges are needed, {@link #hasNewMerges()} returns {@code false} and {@link
* #numMerges()} returns 0. In this case, {@link #await()} returns {@code true} immediately since
* there is nothing to wait for.
*
* @lucene.experimental
*/
public static final class MergeObserver {
private final MergePolicy.MergeSpecification spec;

MergeObserver(MergePolicy.MergeSpecification spec) {
this.spec = spec;
}

/**
* Returns the number of merges in this specification.
*
* @return number of merges, or 0 if no merges were scheduled
*/
public int numMerges() {
return spec == null ? 0 : spec.merges.size();
}

/**
* Returns whether any new merges were scheduled.
*
* @return {@code true} if merges were scheduled, {@code false} if no merges needed
*/
public boolean hasNewMerges() {
return spec != null;
}

/**
* Waits for all merges in this specification to complete. Returns immediately if no merges were
* scheduled.
*
* @return {@code true} if all merges completed successfully or no merges were needed, {@code
* false} on error
*/
public boolean await() {
return spec == null || spec.await();
}

/**
* Waits for all merges in this specification to complete, with timeout. Returns immediately if
* no merges were scheduled.
*
* @param timeout maximum time to wait
* @param unit time unit for timeout
* @return {@code true} if all merges completed within timeout or no merges were needed, {@code
* false} on timeout or error
*/
public boolean await(long timeout, TimeUnit unit) {
return spec == null || spec.await(timeout, unit);
}

/**
* Returns a {@link CompletableFuture} that completes when all merges finish. Returns an
* already-completed future if no merges were scheduled.
*
* @return future that completes when merges finish
*/
public CompletableFuture<Void> awaitAsync() {
return spec == null
? CompletableFuture.completedFuture(null)
: spec.getMergeCompletedFutures();
}

@Override
public String toString() {
return spec == null ? "MergeObserver: no merges" : spec.toString();
}

/**
* Returns the merge at the specified index. Caller must ensure {@link #hasNewMerges()} returns
* {@code true} and index is within bounds.
*
* @param i merge index (0 to {@link #numMerges()} - 1)
* @return the merge at index i
* @throws IndexOutOfBoundsException if index is invalid or no merges exist
*/
public MergePolicy.OneMerge getMerge(int i) {
if (spec == null) {
throw new IndexOutOfBoundsException("No merges available");
}
return spec.merges.get(i);
}
}
}
Loading
Loading