Skip to content

Commit

Permalink
Add unreferenced file cleanup count to merge stats
Browse files Browse the repository at this point in the history
Signed-off-by: Rishav Sagar <rissag@amazon.com>
  • Loading branch information
Rishav Sagar committed Oct 3, 2023
1 parent 3a790c1 commit dc9785d
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131))
- Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189))
- Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562))
- Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down
12 changes: 11 additions & 1 deletion server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public abstract class Engine implements LifecycleAware, Closeable {
protected final EngineConfig engineConfig;
protected final Store store;
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
private final CounterMetric totalUnreferencedFileCleanUpsPerformed = new CounterMetric();
private final CountDownLatch closedLatch = new CountDownLatch(1);
protected final EventListener eventListener;
protected final ReentrantLock failEngineLock = new ReentrantLock();
Expand Down Expand Up @@ -267,6 +268,13 @@ protected final DocsStats docsStats(IndexReader indexReader) {
return new DocsStats(numDocs, numDeletedDocs, sizeInBytes);
}

/**
* Returns the unreferenced file cleanup count for this engine
*/
public long unreferencedFileCleanUpsPerformed() {
return totalUnreferencedFileCleanUpsPerformed.count();
}

/**
* Performs the pre-closing checks on the {@link Engine}.
*
Expand Down Expand Up @@ -1340,7 +1348,9 @@ private void cleanUpUnreferencedFiles() {
.setOpenMode(IndexWriterConfig.OpenMode.APPEND)
)
) {
// do nothing and close this will kick off IndexFileDeleter which will remove all unreferenced files.
// do nothing except increasing metric count and close this will kick off IndexFileDeleter which will
// remove all unreferenced files
totalUnreferencedFileCleanUpsPerformed.inc();
} catch (Exception ex) {
logger.error("Error while deleting unreferenced file ", ex);
}
Expand Down
22 changes: 20 additions & 2 deletions server/src/main/java/org/opensearch/index/merge/MergeStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.index.merge;

import org.opensearch.Version;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -65,9 +66,9 @@ public class MergeStats implements Writeable, ToXContentFragment {

private long totalBytesPerSecAutoThrottle;

public MergeStats() {
private long unreferencedFileCleanUpsPerformed;

}
public MergeStats() {}

public MergeStats(StreamInput in) throws IOException {
total = in.readVLong();
Expand All @@ -81,6 +82,9 @@ public MergeStats(StreamInput in) throws IOException {
totalStoppedTimeInMillis = in.readVLong();
totalThrottledTimeInMillis = in.readVLong();
totalBytesPerSecAutoThrottle = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
unreferencedFileCleanUpsPerformed = in.readOptionalVLong();
}
}

public void add(
Expand Down Expand Up @@ -133,13 +137,22 @@ public void addTotals(MergeStats mergeStats) {
this.totalSizeInBytes += mergeStats.totalSizeInBytes;
this.totalStoppedTimeInMillis += mergeStats.totalStoppedTimeInMillis;
this.totalThrottledTimeInMillis += mergeStats.totalThrottledTimeInMillis;
addUnreferencedFileCleanUpStats(mergeStats.unreferencedFileCleanUpsPerformed);
if (this.totalBytesPerSecAutoThrottle == Long.MAX_VALUE || mergeStats.totalBytesPerSecAutoThrottle == Long.MAX_VALUE) {
this.totalBytesPerSecAutoThrottle = Long.MAX_VALUE;
} else {
this.totalBytesPerSecAutoThrottle += mergeStats.totalBytesPerSecAutoThrottle;
}
}

public void addUnreferencedFileCleanUpStats(long unreferencedFileCleanUpsPerformed) {
this.unreferencedFileCleanUpsPerformed += unreferencedFileCleanUpsPerformed;
}

public long getUnreferencedFileCleanUpsPerformed() {
return this.unreferencedFileCleanUpsPerformed;
}

/**
* The total number of merges executed.
*/
Expand Down Expand Up @@ -240,6 +253,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Fields.TOTAL_THROTTLE_BYTES_PER_SEC).value(new ByteSizeValue(totalBytesPerSecAutoThrottle).toString());
}
builder.field(Fields.TOTAL_THROTTLE_BYTES_PER_SEC_IN_BYTES, totalBytesPerSecAutoThrottle);
builder.field(Fields.UNREFERENCED_FILE_CLEANUPS_PERFORMED, unreferencedFileCleanUpsPerformed);
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -267,6 +281,7 @@ static final class Fields {
static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes";
static final String TOTAL_THROTTLE_BYTES_PER_SEC_IN_BYTES = "total_auto_throttle_in_bytes";
static final String TOTAL_THROTTLE_BYTES_PER_SEC = "total_auto_throttle";
static final String UNREFERENCED_FILE_CLEANUPS_PERFORMED = "unreferenced_file_cleanups_performed";
}

@Override
Expand All @@ -282,5 +297,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(totalStoppedTimeInMillis);
out.writeVLong(totalThrottledTimeInMillis);
out.writeVLong(totalBytesPerSecAutoThrottle);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalVLong(unreferencedFileCleanUpsPerformed);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,9 @@ public MergeStats mergeStats() {
if (engine == null) {
return new MergeStats();
}
return engine.getMergeStats();
final MergeStats mergeStats = engine.getMergeStats();
mergeStats.addUnreferencedFileCleanUpStats(engine.unreferencedFileCleanUpsPerformed());
return mergeStats;
}

public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -3334,6 +3333,9 @@ public void onFailedEngine(String reason, Exception e) {
);

assertTrue(cleanupCompleted.await(10, TimeUnit.SECONDS));
// Cleanup count will be incremented whenever cleanup is performed correctly.
long unreferencedFileCleanUpsPerformed = engine.unreferencedFileCleanUpsPerformed();
assertThat(unreferencedFileCleanUpsPerformed, equalTo(1L));
} catch (Exception ex) {
throw new AssertionError(ex);
}
Expand Down Expand Up @@ -3445,6 +3447,9 @@ public void onFailedEngine(String reason, Exception e) {
);

assertTrue(cleanupCompleted.await(10, TimeUnit.SECONDS));
// Cleanup count will not be incremented whenever cleanup is disabled.
long unreferencedFileCleanUpsPerformed = engine.unreferencedFileCleanUpsPerformed();
assertThat(unreferencedFileCleanUpsPerformed, equalTo(0L));
} catch (Exception ex) {
throw new AssertionError(ex);
}
Expand Down Expand Up @@ -3549,6 +3554,9 @@ public void onFailedEngine(String reason, Exception e) {
);

assertTrue(cleanupCompleted.await(10, TimeUnit.SECONDS));
// Cleanup count will not be incremented whenever there is some issue with cleanup.
long unreferencedFileCleanUpsPerformed = engine.unreferencedFileCleanUpsPerformed();
assertThat(unreferencedFileCleanUpsPerformed, equalTo(0L));
} catch (Exception ex) {
throw new AssertionError(ex);
}
Expand Down

0 comments on commit dc9785d

Please sign in to comment.