diff --git a/CHANGELOG.md b/CHANGELOG.md index bbe79abf3e38a..c2e4fc2fe1aba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253)) - [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402)) - [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948)) +- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index dcb7bdeb30e4f..9185ef0d440ce 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -163,8 +163,8 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; -import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -203,6 +203,7 @@ import java.util.stream.StreamSupport; import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; +import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; /** @@ -1703,13 +1704,8 @@ public void prepareForIndexRecovery() { * @return a sequence number that an operation-based peer recovery can start with. * This is the first operation after the local checkpoint of the safe commit if exists. */ - public long recoverLocallyUpToGlobalCheckpoint() { - assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; - if (state != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(shardId, state); - } - recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); - assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + private long recoverLocallyUpToGlobalCheckpoint() { + validateLocalRecoveryState(); final Optional safeCommit; final long globalCheckpoint; try { @@ -1792,6 +1788,54 @@ public long recoverLocallyUpToGlobalCheckpoint() { } } + public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) { + if (localTranslog) { + return recoverLocallyUpToGlobalCheckpoint(); + } else { + return recoverLocallyUptoLastCommit(); + } + } + + /** + * The method figures out the sequence number basis the last commit. + * + * @return the starting sequence number from which the recovery should start. + */ + private long recoverLocallyUptoLastCommit() { + assert isRemoteTranslogEnabled() : "Remote translog store is not enabled"; + long seqNo; + validateLocalRecoveryState(); + + try { + seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(MAX_SEQ_NO)); + } catch (org.apache.lucene.index.IndexNotFoundException e) { + logger.error("skip local recovery as no index commit found", e); + return UNASSIGNED_SEQ_NO; + } catch (Exception e) { + logger.error("skip local recovery as failed to find the safe commit", e); + return UNASSIGNED_SEQ_NO; + } + + try { + maybeCheckIndex(); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + recoveryState.getTranslog().totalLocal(0); + } catch (Exception e) { + logger.error("check index failed during fetch seqNo", e); + return UNASSIGNED_SEQ_NO; + } + return seqNo; + } + + private void validateLocalRecoveryState() { + assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; + if (state != IndexShardState.RECOVERING) { + throw new IndexShardNotRecoveringException(shardId, state); + } + recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); + assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + } + public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { getEngine().translogManager().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo); } @@ -1998,7 +2042,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t private boolean assertSequenceNumbersInCommit() throws IOException { final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint"; - assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number"; + assert userData.containsKey(MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number"; assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid"; assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid [" + userData.get(Engine.HISTORY_UUID_KEY) @@ -3275,6 +3319,10 @@ private boolean isRemoteStoreEnabled() { return (remoteStore != null && shardRouting.primary()); } + public boolean isRemoteTranslogEnabled() { + return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled(); + } + /** * Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided * ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 85141556657f3..b5702431ed4bf 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -36,10 +36,10 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.ExceptionsHelper; import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchTimeoutException; -import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.cluster.ClusterState; @@ -219,6 +219,12 @@ protected void reestablishRecovery(final StartRecoveryRequest request, final Str threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryId, request)); } + /** + * Initiates recovery of the replica. TODO - Need to revisit it with PRRL and later. @see + * github issue on it. + * @param recoveryId recovery id + * @param preExistingRequest start recovery request + */ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExistingRequest) { final String actionName; final TransportRequest requestToSend; @@ -238,10 +244,17 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); - final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); + boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); + final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; - startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo); + startRequest = getStartRecoveryRequest( + logger, + clusterService.localNode(), + recoveryTarget, + startingSeqNo, + !remoteTranslogEnabled + ); requestToSend = startRequest; actionName = PeerRecoverySourceService.Actions.START_RECOVERY; } catch (final Exception e) { @@ -270,21 +283,32 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi ); } + public static StartRecoveryRequest getStartRecoveryRequest( + Logger logger, + DiscoveryNode localNode, + RecoveryTarget recoveryTarget, + long startingSeqNo + ) { + return getStartRecoveryRequest(logger, localNode, recoveryTarget, startingSeqNo, true); + } + /** * Prepare the start recovery request. * - * @param logger the logger - * @param localNode the local node of the recovery target - * @param recoveryTarget the target of the recovery - * @param startingSeqNo a sequence number that an operation-based peer recovery can start with. - * This is the first operation after the local checkpoint of the safe commit if exists. + * @param logger the logger + * @param localNode the local node of the recovery target + * @param recoveryTarget the target of the recovery + * @param startingSeqNo a sequence number that an operation-based peer recovery can start with. + * This is the first operation after the local checkpoint of the safe commit if exists. + * @param verifyTranslog should the recovery request validate translog consistency with snapshot store metadata. * @return a start recovery request */ public static StartRecoveryRequest getStartRecoveryRequest( Logger logger, DiscoveryNode localNode, RecoveryTarget recoveryTarget, - long startingSeqNo + long startingSeqNo, + boolean verifyTranslog ) { final StartRecoveryRequest request; logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); @@ -292,22 +316,25 @@ public static StartRecoveryRequest getStartRecoveryRequest( Store.MetadataSnapshot metadataSnapshot; try { metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); - // Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index. - try { - final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); - final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID); - assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint; - } catch (IOException | TranslogCorruptedException e) { - logger.warn( - new ParameterizedMessage( - "error while reading global checkpoint from translog, " - + "resetting the starting sequence number from {} to unassigned and recovering as if there are none", - startingSeqNo - ), - e - ); - metadataSnapshot = Store.MetadataSnapshot.EMPTY; - startingSeqNo = UNASSIGNED_SEQ_NO; + if (verifyTranslog) { + // Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene + // index. + try { + final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID); + assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint; + } catch (IOException | TranslogCorruptedException e) { + logger.warn( + new ParameterizedMessage( + "error while reading global checkpoint from translog, " + + "resetting the starting sequence number from {} to unassigned and recovering as if there are none", + startingSeqNo + ), + e + ); + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + startingSeqNo = UNASSIGNED_SEQ_NO; + } } } catch (final org.apache.lucene.index.IndexNotFoundException e) { // happens on an empty folder. no need to log diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 9e219db5a4c96..665e79722770e 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -316,60 +316,85 @@ && isTargetSameHistory() } assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; - sendFileStep.whenComplete(r -> { - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); - // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep); - }, onFailure); - - prepareEngineStep.whenComplete(prepareEngineTime -> { - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]"); - /* - * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. - * This means that any document indexed into the primary after this will be replicated to this replica as well - * make sure to do this before sampling the max sequence number in the next step, to ensure that we send - * all documents up to maxSeqNo in phase2. - */ - RunUnderPrimaryPermit.run( - () -> shard.initiateTracking(request.targetAllocationId()), - shardId + " initiating tracking of " + request.targetAllocationId(), - shard, - cancellableThreads, - logger - ); - - final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - if (logger.isTraceEnabled()) { - logger.trace("snapshot translog for recovery; current size is [{}]", countNumberOfHistoryOperations(startingSeqNo)); - } - final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot( - PEER_RECOVERY_NAME, - startingSeqNo, - Long.MAX_VALUE, - false, - true - ); - resources.add(phase2Snapshot); - retentionLock.close(); + boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = request.isPrimaryRelocation() == false + && shard.isRemoteTranslogEnabled(); + + if (isRecoveringReplicaWithRemoteTxLogEnabledIndex) { + sendFileStep.whenComplete(r -> { + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(0, prepareEngineStep); + }, onFailure); + + prepareEngineStep.whenComplete(prepareEngineTime -> { + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]"); + RunUnderPrimaryPermit.run( + () -> shard.initiateTracking(request.targetAllocationId()), + shardId + " initiating tracking of " + request.targetAllocationId(), + shard, + cancellableThreads, + logger + ); + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + retentionLock.close(); + sendSnapshotStep.onResponse(new SendSnapshotResult(endingSeqNo, 0, TimeValue.ZERO)); + }, onFailure); + } else { + sendFileStep.whenComplete(r -> { + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep); + }, onFailure); + + prepareEngineStep.whenComplete(prepareEngineTime -> { + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]"); + /* + * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. + * This means that any document indexed into the primary after this will be replicated to this replica as well + * make sure to do this before sampling the max sequence number in the next step, to ensure that we send + * all documents up to maxSeqNo in phase2. + */ + RunUnderPrimaryPermit.run( + () -> shard.initiateTracking(request.targetAllocationId()), + shardId + " initiating tracking of " + request.targetAllocationId(), + shard, + cancellableThreads, + logger + ); - // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values - // are at least as high as the corresponding values on the primary when any of these operations were executed on it. - final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); - final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); - final RetentionLeases retentionLeases = shard.getRetentionLeases(); - final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion(); - phase2( - startingSeqNo, - endingSeqNo, - phase2Snapshot, - maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, - retentionLeases, - mappingVersionOnPrimary, - sendSnapshotStep - ); + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + if (logger.isTraceEnabled()) { + logger.trace("snapshot translog for recovery; current size is [{}]", countNumberOfHistoryOperations(startingSeqNo)); + } + final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot( + PEER_RECOVERY_NAME, + startingSeqNo, + Long.MAX_VALUE, + false, + true + ); + resources.add(phase2Snapshot); + retentionLock.close(); + + // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values + // are at least as high as the corresponding values on the primary when any of these operations were executed on it. + final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); + final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); + final RetentionLeases retentionLeases = shard.getRetentionLeases(); + final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion(); + phase2( + startingSeqNo, + endingSeqNo, + phase2Snapshot, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + mappingVersionOnPrimary, + sendSnapshotStep + ); - }, onFailure); + }, onFailure); + } // Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2 final long trimAboveSeqNo = startingSeqNo - 1; diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java new file mode 100644 index 0000000000000..5d317693e02df --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -0,0 +1,172 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.junit.Assert; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.engine.DocIdSeqNoAndSource; +import org.opensearch.index.engine.NRTReplicationEngine; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.translog.WriteOnlyTranslogManager; +import org.opensearch.indices.recovery.RecoveryTarget; +import org.opensearch.indices.replication.common.ReplicationType; + +import java.io.IOException; +import java.util.List; + +import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; + +public class ReplicaRecoveryWithRemoteTranslogOnPrimaryTests extends OpenSearchIndexLevelReplicationTestCase { + + private static final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true") + .build(); + + public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + + // Step1 - Start primary, index docs and flush + shards.startPrimary(); + final IndexShard primary = shards.getPrimary(); + int numDocs = shards.indexDocs(randomIntBetween(10, 100)); + shards.flush(); + + // Step 2 - Start replica for recovery to happen, check both has same number of docs + final IndexShard replica1 = shards.addReplica(); + shards.startAll(); + assertEquals(getDocIdAndSeqNos(primary), getDocIdAndSeqNos(replica1)); + + // Step 3 - Index more docs, run segment replication, check both have same number of docs + int moreDocs = shards.indexDocs(randomIntBetween(10, 100)); + primary.refresh("test"); + replicateSegments(primary, shards.getReplicas()); + assertEquals(getDocIdAndSeqNos(primary), getDocIdAndSeqNos(replica1)); + + // Step 4 - Check both shard has expected number of doc count + assertDocCount(primary, numDocs + moreDocs); + assertDocCount(replica1, numDocs + moreDocs); + + // Step 5 - Start new replica, recovery happens, and check that new replica has docs upto last flush + final IndexShard replica2 = shards.addReplica(); + shards.startAll(); + assertDocCount(replica2, numDocs); + + // Step 6 - Segment replication, check all shards have same number of docs + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs + moreDocs); + } + } + + public void testNoTranslogHistoryTransferred() throws Exception { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + + // Step1 - Start primary, index docs, flush, index more docs, check translog in primary as expected + shards.startPrimary(); + final IndexShard primary = shards.getPrimary(); + int numDocs = shards.indexDocs(randomIntBetween(10, 100)); + shards.flush(); + List docIdAndSeqNosAfterFlush = getDocIdAndSeqNos(primary); + int moreDocs = shards.indexDocs(randomIntBetween(20, 100)); + assertEquals(moreDocs, getTranslog(primary).totalOperations()); + + // Step 2 - Start replica, recovery happens, check docs recovered till last flush + final IndexShard replica = shards.addReplica(); + shards.startAll(); + assertEquals(docIdAndSeqNosAfterFlush, getDocIdAndSeqNos(replica)); + assertDocCount(replica, numDocs); + assertEquals(NRTReplicationEngine.class, replica.getEngine().getClass()); + + // Step 3 - Check replica's translog has no operations + assertEquals(WriteOnlyTranslogManager.class, replica.getEngine().translogManager().getClass()); + WriteOnlyTranslogManager replicaTranslogManager = (WriteOnlyTranslogManager) replica.getEngine().translogManager(); + assertEquals(0, replicaTranslogManager.getTranslog().totalOperations()); + + // Adding this for close to succeed + shards.flush(); + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs + moreDocs); + } + } + + public void testStartSequenceForReplicaRecovery() throws Exception { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + + shards.startPrimary(); + final IndexShard primary = shards.getPrimary(); + int numDocs = shards.indexDocs(randomIntBetween(10, 100)); + shards.flush(); + + final IndexShard replica = shards.addReplica(); + shards.startAll(); + + allowShardFailures(); + replica.failShard("test", null); + + final ShardRouting replicaRouting = replica.routingEntry(); + final IndexMetadata newIndexMetadata = IndexMetadata.builder(replica.indexSettings().getIndexMetadata()) + .primaryTerm(replicaRouting.shardId().id(), replica.getOperationPrimaryTerm() + 1) + .build(); + closeShards(replica); + shards.removeReplica(replica); + + int moreDocs = shards.indexDocs(randomIntBetween(20, 100)); + shards.flush(); + + IndexShard newReplicaShard = newShard( + newShardRouting( + replicaRouting.shardId(), + replicaRouting.currentNodeId(), + false, + ShardRoutingState.INITIALIZING, + RecoverySource.PeerRecoverySource.INSTANCE + ), + replica.shardPath(), + newIndexMetadata, + null, + null, + replica.getEngineFactory(), + replica.getEngineConfigFactory(), + replica.getGlobalCheckpointSyncer(), + replica.getRetentionLeaseSyncer(), + EMPTY_EVENT_LISTENER, + null + ); + shards.addReplica(newReplicaShard); + shards.recoverReplica(newReplicaShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener) { + @Override + public IndexShard indexShard() { + IndexShard idxShard = super.indexShard(); + // verify the starting sequence number while recovering a failed shard which has a valid last commit + long startingSeqNo = -1; + try { + startingSeqNo = Long.parseLong( + idxShard.store().readLastCommittedSegmentsInfo().getUserData().get(SequenceNumbers.MAX_SEQ_NO) + ); + } catch (IOException e) { + Assert.fail(); + } + assertEquals(numDocs - 1, startingSeqNo); + return idxShard; + } + }); + + shards.flush(); + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs + moreDocs); + } + } +} diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 2a88345346e52..a50089831b3e9 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -211,7 +211,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { IndexShard shard = newShard(false); shard.markAsRecovering("for testing", new RecoveryState(shard.routingEntry(), localNode, localNode)); shard.prepareForIndexRecovery(); - assertThat(shard.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(shard.recoverLocallyAndFetchStartSeqNo(true), equalTo(UNASSIGNED_SEQ_NO)); assertThat(shard.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); assertThat(shard.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); assertThat(shard.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -239,7 +239,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { ); replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(globalCheckpoint + 1)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(globalCheckpoint + 1)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(expectedTotalLocal)); assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectedTotalLocal)); assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -254,7 +254,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE)); replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(UNASSIGNED_SEQ_NO)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -276,10 +276,10 @@ public void testPrepareIndexForPeerRecovery() throws Exception { replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); if (safeCommit.isPresent()) { - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(safeCommit.get().localCheckpoint + 1)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(safeCommit.get().localCheckpoint + 1)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(0)); } else { - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(UNASSIGNED_SEQ_NO)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); } assertThat(replica.recoveryState().getStage(), equalTo(RecoveryState.Stage.TRANSLOG)); @@ -322,7 +322,7 @@ public void testClosedIndexSkipsLocalRecovery() throws Exception { ); replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(safeCommit.get().localCheckpoint + 1)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(safeCommit.get().localCheckpoint + 1)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(0)); assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -349,7 +349,7 @@ public void testResetStartingSeqNoIfLastCommitCorrupted() throws Exception { shard = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE)); shard.markAsRecovering("peer recovery", new RecoveryState(shard.routingEntry(), pNode, rNode)); shard.prepareForIndexRecovery(); - long startingSeqNo = shard.recoverLocallyUpToGlobalCheckpoint(); + long startingSeqNo = shard.recoverLocallyAndFetchStartSeqNo(true); shard.store().markStoreCorrupted(new IOException("simulated")); RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null); StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(logger, rNode, recoveryTarget, startingSeqNo); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 09eca006d600a..1fcdfd79c544e 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -134,7 +134,6 @@ import java.io.IOException; import java.util.ArrayList; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -853,7 +852,9 @@ protected final void recoverUnstartedReplica( } replica.prepareForIndexRecovery(); final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); - final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint(); + IndexShard indexShard = recoveryTarget.indexShard(); + boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); + final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled); final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest( logger, rNode,