Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment Replication - Fix ShardLockObtained error during corruption cases #10370

Merged
merged 12 commits into from
Oct 5, 2023
Prev Previous commit
Next Next commit
Revert flaky IT
Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Oct 5, 2023
commit 8eca8921831f53363f07b7586abf737d5b95a9f7
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,10 @@
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
Expand All @@ -73,8 +70,6 @@
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.NRTReplicationReaderManager;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
Expand All @@ -87,7 +82,6 @@
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;
import org.junit.Before;

Expand All @@ -100,7 +94,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -1784,88 +1777,4 @@ public void testRealtimeTermVectorRequestsUnSuccessful() throws IOException {

}

public void testSendCorruptBytesToReplica() throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
);
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.refresh_interval", -1)
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
primaryTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK) && failed.get() == false) {
failed.compareAndSet(false, true);
FileChunkRequest req = (FileChunkRequest) request;
logger.info("SENDING CORRUPT file chunk [{}] lastChunk: {}", req, req.lastChunk());
TransportRequest corrupt = new FileChunkRequest(
req.recoveryId(),
((FileChunkRequest) request).requestSeqNo(),
((FileChunkRequest) request).shardId(),
((FileChunkRequest) request).metadata(),
((FileChunkRequest) request).position(),
new BytesArray("test"),
false,
0,
0L
);
connection.sendRequest(requestId, action, corrupt, options);
latch.countDown();
} else {
connection.sendRequest(requestId, action, request, options);
}
}
);
for (int i = 0; i < 100; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refresh(INDEX_NAME);
latch.await();
assertTrue(failed.get());
final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
assertBusy(() -> {
// wait until the original shard is closed.
assertEquals(IndexShardState.CLOSED, indexShard.state());
}, 1, TimeUnit.MINUTES);
waitForActiveShardOnNode(replicaNode);
// reset checkIndex to ensure our original shard doesn't throw
resetCheckIndexStatus();
assertDocCounts(100, primaryNode, replicaNode);
}

private void waitForActiveShardOnNode(String replicaNode) throws Exception {
assertBusy(() -> {
// wait until the shard is created
final Index index = resolveIndex(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replicaNode);
IndexService indexService = indicesService.indexService(index);
assertNotNull(indexService);
IndexShard indexShard1 = getIndexShard(replicaNode, INDEX_NAME);
assertNotNull(indexShard1);
assertFalse(indexShard1.store().isMarkedCorrupted());
assertEquals(IndexShardState.STARTED, indexShard1.state());
}, 1, TimeUnit.MINUTES);
}
}