diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 33bc5a8f3afe6..0bb22ea99282e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -24,6 +24,7 @@ import org.apache.lucene.util.BytesRef; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.flush.FlushRequest; +import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.get.GetResponse; @@ -48,6 +49,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.Preference; +import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; @@ -58,6 +60,7 @@ import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.XContentBuilder; @@ -71,6 +74,7 @@ import org.opensearch.index.engine.NRTReplicationReaderManager; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.NodeClosedException; @@ -82,6 +86,7 @@ import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportService; import org.junit.Before; @@ -94,6 +99,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static java.util.Arrays.asList; @@ -1777,4 +1783,133 @@ public void testRealtimeTermVectorRequestsUnSuccessful() throws IOException { } + public void testSendCorruptBytesToReplica() throws Exception { + // this test stubs transport calls specific to node-node replication. + assumeFalse( + "Skipping the test as its not compatible with segment replication with remote store.", + segmentReplicationWithRemoteEnabled() + ); + final String primaryNode = internalCluster().startDataOnlyNode(); + createIndex( + INDEX_NAME, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.refresh_interval", -1) + .build() + ); + ensureYellow(INDEX_NAME); + final String replicaNode = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + primaryNode + )); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + primaryTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, replicaNode), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK) && failed.getAndSet(true) == false) { + FileChunkRequest req = (FileChunkRequest) request; + logger.info("SENDING CORRUPT file chunk [{}] lastChunk: {}", req, req.lastChunk()); + TransportRequest corrupt = new FileChunkRequest( + req.recoveryId(), + ((FileChunkRequest) request).requestSeqNo(), + ((FileChunkRequest) request).shardId(), + ((FileChunkRequest) request).metadata(), + ((FileChunkRequest) request).position(), + new BytesArray("test"), + false, + 0, + 0L + ); + connection.sendRequest(requestId, action, corrupt, options); + latch.countDown(); + } else { + connection.sendRequest(requestId, action, request, options); + } + } + ); + for (int i = 0; i < 100; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource(jsonBuilder().startObject().field("field", i).endObject()) + .get(); + } + final long originalRecoveryTime = getRecoveryStopTime(); + assertNotEquals(originalRecoveryTime, 0); + refresh(INDEX_NAME); + latch.await(); + assertTrue(failed.get()); + waitForNewPeerRecovery(originalRecoveryTime); + // reset checkIndex to ensure our original shard doesn't throw + resetCheckIndexStatus(); + assertDocCounts(100, primaryNode, replicaNode); + } + + public void testWipeSegmentBetweenSyncs() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String primaryNode = internalCluster().startDataOnlyNode(); + createIndex( + INDEX_NAME, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.refresh_interval", -1) + .build() + ); + ensureYellow(INDEX_NAME); + final String replicaNode = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + for (int i = 0; i < 100; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource(jsonBuilder().startObject().field("field", i).endObject()) + .get(); + } + refresh(INDEX_NAME); + ensureGreen(INDEX_NAME); + final long originalRecoveryTime = getRecoveryStopTime(); + + final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME); + waitForSearchableDocs(INDEX_NAME, 100, List.of(replicaNode)); + indexShard.store().directory().deleteFile("_0.si"); + + for (int i = 101; i < 201; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource(jsonBuilder().startObject().field("field", i).endObject()) + .get(); + } + refresh(INDEX_NAME); + waitForNewPeerRecovery(originalRecoveryTime); + resetCheckIndexStatus(); + assertDocCounts(200, primaryNode, replicaNode); + } + + private static void waitForNewPeerRecovery(long originalRecoveryTime) throws Exception { + assertBusy(() -> { + // assert we have a peer recovery after the original + final long time = getRecoveryStopTime(); + assertNotEquals(time, 0); + assertNotEquals(originalRecoveryTime, time); + + }, 1, TimeUnit.MINUTES); + } + + private static long getRecoveryStopTime() { + final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(INDEX_NAME).get(); + final List recoveryStates = recoveryResponse.shardRecoveryStates().get(INDEX_NAME); + for (RecoveryState recoveryState : recoveryStates) { + if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.PEER) { + return recoveryState.getTimer().stopTime(); + } + } + return 0L; + } }