Skip to content

Commit

Permalink
Segment Replication - Fix ShardLockObtained error during corruption c…
Browse files Browse the repository at this point in the history
…ases (opensearch-project#10370)

* Segment Replication - Fix ShardLockObtained error during corruption cases

This change fixes a bug where shards could not be recreated locally after corruption.
This occured because the store was not decref'd to 0 if the commit on close would fail
with a corruption exception.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Remove exra logs

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Remove flaky assertion on store refcount

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Remove flaky test.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* PR Feedback.

Remove hacky handling of corruption when fetching metadata.  This will now check for store corruption
when replication has failed and fail the shard accordingly.

This commit also fixes logging in NRTReplicationEngine.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix unit test.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix test failure testSegRepSucceedsOnPreviousCopiedFiles.

This test broke because we invoked target.indexShard on a closed replicationTarget.
In these cases we can assume the store is not corrupt.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* spotless

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Revert flaky IT

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix flakiness failure by expecting RTE when check index fails.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* reintroduce ITs and use recoveries API instead of waiting on shard state.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix edge case where flush failures would not get reported as corruption.

Signed-off-by: Marc Handalian <handalm@amazon.com>

---------

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 authored and austintlee committed Oct 23, 2023
1 parent 352f0a5 commit 4a0be4e
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix class_cast_exception when passing int to _version and other metadata fields in ingest simulate API ([#10101](https://github.com/opensearch-project/OpenSearch/pull/10101))
- Fix circular dependency in Settings initialization ([10194](https://github.com/opensearch-project/OpenSearch/pull/10194))
- Fix registration and initialization of multiple extensions ([10256](https://github.com/opensearch-project/OpenSearch/pull/10256))
- Fix Segment Replication ShardLockObtainFailedException bug during index corruption ([10370](https://github.com/opensearch-project/OpenSearch/pull/10370))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,10 @@ protected IndexShard getIndexShard(String node, ShardId shardId, String indexNam
protected IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
IndexService indexService = indicesService.indexService(index);
assertNotNull(indexService);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return indexService.getShard(shardId.get());
return shardId.map(indexService::getShard).orElse(null);
}

protected boolean segmentReplicationWithRemoteEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.get.GetResponse;
Expand Down Expand Up @@ -58,6 +59,7 @@
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
Expand All @@ -71,6 +73,7 @@
import org.opensearch.index.engine.NRTReplicationReaderManager;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.NodeClosedException;
Expand All @@ -82,6 +85,7 @@
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;
import org.junit.Before;

Expand All @@ -94,6 +98,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -1777,4 +1782,134 @@ public void testRealtimeTermVectorRequestsUnSuccessful() throws IOException {

}

public void testSendCorruptBytesToReplica() throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
);
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.refresh_interval", -1)
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
primaryTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK) && failed.getAndSet(true) == false) {
FileChunkRequest req = (FileChunkRequest) request;
logger.info("SENDING CORRUPT file chunk [{}] lastChunk: {}", req, req.lastChunk());
TransportRequest corrupt = new FileChunkRequest(
req.recoveryId(),
((FileChunkRequest) request).requestSeqNo(),
((FileChunkRequest) request).shardId(),
((FileChunkRequest) request).metadata(),
((FileChunkRequest) request).position(),
new BytesArray("test"),
false,
0,
0L
);
connection.sendRequest(requestId, action, corrupt, options);
latch.countDown();
} else {
connection.sendRequest(requestId, action, request, options);
}
}
);
for (int i = 0; i < 100; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);
assertNotEquals(originalRecoveryTime, 0);
refresh(INDEX_NAME);
latch.await();
assertTrue(failed.get());
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
// reset checkIndex to ensure our original shard doesn't throw
resetCheckIndexStatus();
waitForSearchableDocs(100, primaryNode, replicaNode);
}

public void testWipeSegmentBetweenSyncs() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.refresh_interval", -1)
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refresh(INDEX_NAME);
ensureGreen(INDEX_NAME);
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);

final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
waitForSearchableDocs(INDEX_NAME, 10, List.of(replicaNode));
indexShard.store().directory().deleteFile("_0.si");

for (int i = 11; i < 21; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refresh(INDEX_NAME);
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
resetCheckIndexStatus();
waitForSearchableDocs(20, primaryNode, replicaNode);
}

private void waitForNewPeerRecovery(String replicaNode, long originalRecoveryTime) throws Exception {
assertBusy(() -> {
// assert we have a peer recovery after the original
final long time = getRecoveryStopTime(replicaNode);
assertNotEquals(time, 0);
assertNotEquals(originalRecoveryTime, time);

}, 1, TimeUnit.MINUTES);
}

private long getRecoveryStopTime(String nodeName) {
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(INDEX_NAME).get();
final List<RecoveryState> recoveryStates = recoveryResponse.shardRecoveryStates().get(INDEX_NAME);
logger.info("Recovery states {}", recoveryResponse);
for (RecoveryState recoveryState : recoveryStates) {
if (recoveryState.getTargetNode().getName().equals(nodeName)) {
return recoveryState.getTimer().stopTime();
}
}
return 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
try {
commitSegmentInfos();
} catch (IOException e) {
maybeFailEngine("flush", e);
throw new FlushFailedEngineException(shardId, e);
} finally {
flushLock.unlock();
Expand Down Expand Up @@ -437,13 +438,29 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
}
commitSegmentInfos(latestSegmentInfos);
IOUtils.close(readerManager, translogManager, store::decRef);
try {
commitSegmentInfos(latestSegmentInfos);
} catch (IOException e) {
// mark the store corrupted unless we are closing as result of engine failure.
// in this case Engine#failShard will handle store corruption.
if (failEngineLock.isHeldByCurrentThread() == false && store.isMarkedCorrupted() == false) {
try {
store.markStoreCorrupted(e);
} catch (IOException ex) {
logger.warn("Unable to mark store corrupted", ex);
}
}
}
IOUtils.close(readerManager, translogManager);
} catch (Exception e) {
logger.warn("failed to close engine", e);
logger.error("failed to close engine", e);
} finally {
logger.debug("engine closed [{}]", reason);
closedLatch.countDown();
try {
store.decRef();
logger.debug("engine closed [{}]", reason);
} finally {
closedLatch.countDown();
}
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,13 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio
*/
public Map<String, StoreFileMetadata> getSegmentMetadataMap(SegmentInfos segmentInfos) throws IOException {
assert indexSettings.isSegRepEnabled();
return loadMetadata(segmentInfos, directory, logger, true).fileMetadata;
failIfCorrupted();
try {
return loadMetadata(segmentInfos, directory, logger, true).fileMetadata;
} catch (NoSuchFileException | CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
markStoreCorrupted(ex);
throw ex;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.OpenSearchException;
import org.opensearch.action.StepListener;
import org.opensearch.common.UUIDs;
import org.opensearch.common.lucene.Lucene;
Expand Down Expand Up @@ -261,9 +260,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse)
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
// broken. We have to clean up this shard entirely, remove all files and bubble it up to the
// source shard since this index might be broken there as well? The Source can handle this and checks
// its content on disk if possible.
// broken. We have to clean up this shard entirely, remove all files and bubble it up.
try {
try {
store.removeCorruptionMarker();
Expand All @@ -279,14 +276,14 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse)
// In this case the shard is closed at some point while updating the reader.
// This can happen when the engine is closed in a separate thread.
logger.warn("Shard is already closed, closing replication");
} catch (OpenSearchException ex) {
} catch (CancellableThreads.ExecutionCancelledException ex) {
/*
Ignore closed replication target as it can happen due to index shard closed event in a separate thread.
In such scenario, ignore the exception
*/
assert cancellableThreads.isCancelled() : "Replication target closed but segment replication not cancelled";
assert cancellableThreads.isCancelled() : "Replication target cancelled but cancellable threads not cancelled";
} catch (Exception ex) {
throw new OpenSearchCorruptionException(ex);
throw new ReplicationFailedException(ex);
} finally {
if (store != null) {
store.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.support.ChannelActionListener;
Expand All @@ -28,6 +29,7 @@
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.ForceSyncRequest;
Expand All @@ -46,6 +48,7 @@
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -522,7 +525,7 @@ public void onResponse(Void o) {
@Override
public void onFailure(Exception e) {
logger.debug("Replication failed {}", target.description());
if (e instanceof OpenSearchCorruptionException) {
if (isStoreCorrupt(target) || e instanceof CorruptIndexException || e instanceof OpenSearchCorruptionException) {
onGoingReplications.fail(replicationId, new ReplicationFailedException("Store corruption during replication", e), true);
return;
}
Expand All @@ -531,6 +534,27 @@ public void onFailure(Exception e) {
});
}

private boolean isStoreCorrupt(SegmentReplicationTarget target) {
// ensure target is not already closed. In that case
// we can assume the store is not corrupt and that the replication
// event completed successfully.
if (target.refCount() > 0) {
final Store store = target.store();
if (store.tryIncRef()) {
try {
return store.isMarkedCorrupted();
} catch (IOException ex) {
logger.warn("Unable to determine if store is corrupt", ex);
return false;
} finally {
store.decRef();
}
}
}
// store already closed.
return false;
}

private class FileChunkTransportRequestHandler implements TransportRequestHandler<FileChunkRequest> {

// How many bytes we've copied since we last called RateLimiter.pause
Expand Down
Loading

0 comments on commit 4a0be4e

Please sign in to comment.