Skip to content

Commit

Permalink
Cleanup Unreferenced file on segment merge failure
Browse files Browse the repository at this point in the history
Signed-off-by: Rishav Sagar <rissag@amazon.com>
  • Loading branch information
Rishav Sagar committed Aug 22, 2023
1 parent 61c5f17 commit dec0641
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129))
- Rethrow OpenSearch exception for non-concurrent path while using concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9177))
- Improve performance of encoding composite keys in multi-term aggregations ([#9412](https://github.com/opensearch-project/OpenSearch/pull/9412))
- Cleanup Unreferenced file on segment merge failure ([#9483](https://github.com/opensearch-project/OpenSearch/pull/9483))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
IndexSettings.INDEX_SEARCH_IDLE_AFTER,
IndexSettings.INDEX_SEARCH_THROTTLED,
IndexSettings.INDEX_UNREFERENCED_FILE_CLEANUP,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
27 changes: 27 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,18 @@ public final class IndexSettings {
Property.Dynamic
);

/**
* This setting controls if unreferenced files will be cleaned up in case segment merge fails due to disk full.
*
* Defaults to true which means unreferenced files will be cleaned up in case segment merge fails.
*/
public static final Setting<Boolean> INDEX_UNREFERENCED_FILE_CLEANUP = Setting.boolSetting(
"index.unreferenced_file_cleanup.enabled",
true,
Property.IndexScope,
Property.Dynamic
);

/**
* Determines a balance between file-based and operations-based peer recoveries. The number of operations that will be used in an
* operations-based peer recovery is limited to this proportion of the total number of documents in the shard (including deleted
Expand Down Expand Up @@ -676,6 +688,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
private volatile String defaultPipeline;
private volatile String requiredPipeline;
private volatile boolean searchThrottled;
private volatile boolean shouldCleanupUnreferencedFile;
private volatile long mappingNestedFieldsLimit;
private volatile long mappingNestedDocsLimit;
private volatile long mappingTotalFieldsLimit;
Expand Down Expand Up @@ -793,6 +806,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
}

this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.shouldCleanupUnreferencedFile = INDEX_UNREFERENCED_FILE_CLEANUP.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -905,6 +919,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(FINAL_PIPELINE, this::setRequiredPipeline);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled);
scopedSettings.addSettingsUpdateConsumer(INDEX_UNREFERENCED_FILE_CLEANUP, this::setShouldCleanupUnreferencedFile);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, this::setRetentionLeaseMillis);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING, this::setMappingNestedFieldsLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING, this::setMappingNestedDocsLimit);
Expand Down Expand Up @@ -1528,6 +1543,18 @@ private void setSearchThrottled(boolean searchThrottled) {
this.searchThrottled = searchThrottled;
}

/**
* Returns true if unreferenced files should be cleaned up on merge failure for this index.
*
*/
public boolean shouldCleanupUnreferencedFile() {
return shouldCleanupUnreferencedFile;
}

private void setShouldCleanupUnreferencedFile(boolean shouldCleanupUnreferencedFile) {
this.shouldCleanupUnreferencedFile = shouldCleanupUnreferencedFile;
}

public long getMappingNestedFieldsLimit() {
return mappingNestedFieldsLimit;
}
Expand Down
41 changes: 41 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
Expand Down Expand Up @@ -950,6 +953,10 @@ protected void fillSegmentStats(SegmentReader segmentReader, boolean includeSegm
}
}

boolean shouldCleanupUnreferencedFile() {
return engineConfig.getIndexSettings().shouldCleanupUnreferencedFile();
}

private Map<String, Long> getSegmentFileSizes(SegmentReader segmentReader) {
Directory directory = null;
SegmentCommitInfo segmentCommitInfo = segmentReader.getSegmentInfo();
Expand Down Expand Up @@ -1291,6 +1298,20 @@ public void failEngine(String reason, @Nullable Exception failure) {
);
}
}

// If cleanup unreferenced flag is enabled and force merge or regular merge failed due to disk full,
// cleanup all unreferenced files created during failed and reset the shard state back to last
// Lucene Commit.
if (shouldCleanupUnreferencedFile()
&& (reason.equals("force merge") || reason.equals("merge failed"))
&& failure != null
&& failure.getCause() != null
&& failure.getCause().getCause() != null
&& failure.getCause().getCause().getMessage() != null
&& failure.getCause().getCause().getMessage().contains("No space left on device")) {
cleanUpUnreferencedFiles();
}

eventListener.onFailedEngine(reason, failure);
}
} catch (Exception inner) {
Expand All @@ -1309,6 +1330,26 @@ public void failEngine(String reason, @Nullable Exception failure) {
}
}

/**
* Cleanup all unreferenced files generated during failed segment merge. This resets shard state to last Lucene
* commit.
*/
private void cleanUpUnreferencedFiles() {
try (
IndexWriter writer = new IndexWriter(
store.directory(),
new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setCommitOnClose(false)
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND)
)
) {
// do nothing and close this will kick off IndexFileDeleter which will remove all unreferenced files
} catch (Exception ex) {
logger.error("Error while deleting unreferenced file", ex);
}
}

/** Check whether the engine should be failed */
protected boolean maybeFailEngine(String source, Exception e) {
if (Lucene.isCorruptionException(e)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.filter.RegexFilter;
import org.apache.lucene.codecs.LiveDocsFormat;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.KeywordField;
import org.apache.lucene.document.LongPoint;
Expand Down Expand Up @@ -3229,6 +3230,197 @@ public void testFailStart() throws IOException {
}
}

public void testUnreferencedFileCleanUpOnSegmentMergeFailureWithCleanUpEnabled() throws Exception {
MockDirectoryWrapper wrapper = newMockDirectory();
MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() {
public boolean didFail1;
public boolean didFail2;

@Override
public void eval(MockDirectoryWrapper dir) throws IOException {
if (!doFail) {
return;
}
if (callStackContainsAnyOf("mergeTerms") && !didFail1) {
didFail1 = true;
throw new IOException("No space left on device");
}
if (callStackContains(LiveDocsFormat.class, "writeLiveDocs") && !didFail2) {
didFail2 = true;
throw new IOException("No space left on device");
}
}
};

wrapper.failOn(fail);
try {
Store store = createStore(wrapper);
final Engine.EventListener eventListener = new Engine.EventListener() {
@Override
public void onFailedEngine(String reason, Exception e) {
try {
// Since only one document is committed and unreferenced files are cleaned up,
// there are 4 files (*cfs, *cfe, *si and segments_*).
assertThat(store.directory().listAll().length, equalTo(4));
store.close();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
};

final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
final AtomicLong retentionLeasesVersion = new AtomicLong();
final AtomicReference<RetentionLeases> retentionLeasesHolder = new AtomicReference<>(
new RetentionLeases(primaryTerm, retentionLeasesVersion.get(), Collections.emptyList())
);
InternalEngine engine = createEngine(
config(
defaultSettings,
store,
createTempDir(),
newMergePolicy(),
null,
null,
null,
globalCheckpoint::get,
retentionLeasesHolder::get,
new NoneCircuitBreakerService(),
eventListener
)
);

List<Segment> segments = engine.segments(true);
assertThat(segments.isEmpty(), equalTo(true));

ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
engine.refresh("test");
engine.flush();

segments = engine.segments(false);
assertThat(segments.size(), equalTo(1));

ParsedDocument doc2 = testParsedDocument("2", null, testDocumentWithTextField(), B_2, null);
engine.index(indexForDoc(doc2));
engine.refresh("test");

segments = engine.segments(false);
assertThat(segments.size(), equalTo(2));

fail.setDoFail();
// IndexWriter can throw either IOException or IllegalStateException depending on whether tragedy is set or not.
expectThrowsAnyOf(
Arrays.asList(IOException.class, IllegalStateException.class),
() -> engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID())
);
engine.close();
} finally {
wrapper.close();
}
}

public void testUnreferencedFileCleanUpOnSegmentMergeFailureWithCleanUpDisabled() throws Exception {
MockDirectoryWrapper wrapper = newMockDirectory();
MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() {
public boolean didFail1;
public boolean didFail2;

@Override
public void eval(MockDirectoryWrapper dir) throws IOException {
if (!doFail) {
return;
}
if (callStackContainsAnyOf("mergeTerms") && !didFail1) {
didFail1 = true;
throw new IOException("No space left on device");
}
if (callStackContains(LiveDocsFormat.class, "writeLiveDocs") && !didFail2) {
didFail2 = true;
throw new IOException("No space left on device");
}
}
};

wrapper.failOn(fail);
try {
Store store = createStore(wrapper);
final Engine.EventListener eventListener = new Engine.EventListener() {
@Override
public void onFailedEngine(String reason, Exception e) {
try {
// Since now cleanup is not happening now, all unrefrenced files now be present as well.
assertThat(store.directory().listAll().length, equalTo(13));
} catch (IOException ex) {
throw new RuntimeException(ex);
} finally {
store.close();
}
}
};

final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
final AtomicLong retentionLeasesVersion = new AtomicLong();
final AtomicReference<RetentionLeases> retentionLeasesHolder = new AtomicReference<>(
new RetentionLeases(primaryTerm, retentionLeasesVersion.get(), Collections.emptyList())
);
InternalEngine engine = createEngine(
config(
defaultSettings,
store,
createTempDir(),
newMergePolicy(),
null,
null,
null,
globalCheckpoint::get,
retentionLeasesHolder::get,
new NoneCircuitBreakerService(),
eventListener
)
);

// Disable cleanup
final IndexSettings indexSettings = engine.config().getIndexSettings();
final IndexMetadata indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata())
.settings(
Settings.builder().put(indexSettings.getSettings()).put(IndexSettings.INDEX_UNREFERENCED_FILE_CLEANUP.getKey(), false)
)
.build();
indexSettings.updateIndexMetadata(indexMetadata);

List<Segment> segments = engine.segments(true);
assertThat(segments.isEmpty(), equalTo(true));

ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
engine.refresh("test");
engine.flush();

segments = engine.segments(false);
assertThat(segments.size(), equalTo(1));

ParsedDocument doc2 = testParsedDocument("2", null, testDocumentWithTextField(), B_2, null);
engine.index(indexForDoc(doc2));
engine.refresh("test");

segments = engine.segments(false);
assertThat(segments.size(), equalTo(2));

fail.setDoFail();
// IndexWriter can throw either IOException or IllegalStateException depending on whether tragedy is set or not.
expectThrowsAnyOf(
Arrays.asList(IOException.class, IllegalStateException.class),
() -> engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID())
);
engine.close();
} finally {
wrapper.close();
}
}

public void testSettings() {
CodecService codecService = new CodecService(null, engine.config().getIndexSettings(), logger);
LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,10 +840,39 @@ public EngineConfig config(
final @Nullable Supplier<RetentionLeases> maybeRetentionLeasesSupplier,
final CircuitBreakerService breakerService
) {
final IndexWriterConfig iwc = newIndexWriterConfig();
final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
final Engine.EventListener eventListener = new Engine.EventListener() {
}; // we don't need to notify anybody in this test

return config(
indexSettings,
store,
translogPath,
mergePolicy,
externalRefreshListener,
internalRefreshListener,
indexSort,
maybeGlobalCheckpointSupplier,
maybeGlobalCheckpointSupplier == null ? null : () -> RetentionLeases.EMPTY,
breakerService,
eventListener
);
}

public EngineConfig config(
final IndexSettings indexSettings,
final Store store,
final Path translogPath,
final MergePolicy mergePolicy,
final ReferenceManager.RefreshListener externalRefreshListener,
final ReferenceManager.RefreshListener internalRefreshListener,
final Sort indexSort,
final @Nullable LongSupplier maybeGlobalCheckpointSupplier,
final @Nullable Supplier<RetentionLeases> maybeRetentionLeasesSupplier,
final CircuitBreakerService breakerService,
final Engine.EventListener eventListener
) {
final IndexWriterConfig iwc = newIndexWriterConfig();
final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
final List<ReferenceManager.RefreshListener> extRefreshListenerList = externalRefreshListener == null
? emptyList()
: Collections.singletonList(externalRefreshListener);
Expand Down
Loading

0 comments on commit dec0641

Please sign in to comment.