Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Update peer recovery logic for segment replication #5344

Merged
merged 10 commits into from
Jan 6, 2023
Prev Previous commit
Next Next commit
Address review comment, move force segrep sync handler to SegRepTarge…
…tService

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed Jan 5, 2023
commit aa5e3f308584a07c137ba10238e58b2459ae7c2f
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))



### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ private void waitForReplicaUpdate() throws Exception {
});
}

protected IndexShard getIndexShard(String node) {
private IndexShard getIndexShard(String node) {
final Index index = resolveIndex(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
createIndex();
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(0, 200);
final int initialDocCount = scaledRandomIntBetween(1, 100);
ingestDocs(initialDocCount);

logger.info("--> verifying count {}", initialDocCount);
Expand Down Expand Up @@ -183,10 +183,9 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {

final int finalDocCount = initialDocCount;
ingestDocs(finalDocCount);
refresh(INDEX_NAME);

logger.info(
"Verify all documents are available on both old primary and replica i.e. older primary is still refreshing replica nodes"
);
logger.info("Verify older primary is still refreshing replica nodes");
client().admin().indices().prepareRefresh().execute().actionGet();
assertHitCount(
client(old_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
Expand Down
14 changes: 12 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1633,7 +1633,17 @@ static Engine.Searcher wrapSearcher(
}
}

public void resetEngine() throws IOException, InterruptedException, TimeoutException {
/**
* Used with segment replication during relocation handoff, this method updates current read only engine to global
* checkpoint followed by changing to writeable engine
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*
* @opensearch.internal
*/
public void resetToWriteableEngine() throws IOException, InterruptedException, TimeoutException {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); });
}

Expand Down Expand Up @@ -3348,7 +3358,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
}
/**
* With segment replication enabled for primary relocation, recover replica shard initially as read only and
* promote to during relocation handoff post segment replication.
* change to a writeable engine during relocation handoff after a round of segment replication.
*/
boolean isReadOnlyReplica = indexSettings.isSegRepEnabled()
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
&& (shardRouting.primary() == false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef;
import org.opensearch.indices.replication.common.ReplicationTimer;
Expand Down Expand Up @@ -112,7 +111,6 @@ public static class Actions {
public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog";
public static final String FINALIZE = "internal:index/shard/recovery/finalize";
public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/handoff_primary_context";
public static final String FORCE_SYNC = "internal:index/shard/recovery/segments_sync";
}

private final ThreadPool threadPool;
Expand All @@ -124,21 +122,17 @@ public static class Actions {

private final ReplicationCollection<RecoveryTarget> onGoingRecoveries;

private final SegmentReplicationTargetService segmentReplicationTargetService;

public PeerRecoveryTargetService(
ThreadPool threadPool,
TransportService transportService,
RecoverySettings recoverySettings,
ClusterService clusterService,
SegmentReplicationTargetService segmentReplicationTargetService
ClusterService clusterService
) {
this.threadPool = threadPool;
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.onGoingRecoveries = new ReplicationCollection<>(logger, threadPool);
this.segmentReplicationTargetService = segmentReplicationTargetService;

transportService.registerRequestHandler(
Actions.FILES_INFO,
Expand Down Expand Up @@ -182,12 +176,6 @@ public PeerRecoveryTargetService(
RecoveryHandoffPrimaryContextRequest::new,
new HandoffPrimaryContextRequestHandler()
);
transportService.registerRequestHandler(
Actions.FORCE_SYNC,
ThreadPool.Names.GENERIC,
ForceSyncRequest::new,
new ForceSyncTransportRequestHandler()
);
}

@Override
Expand All @@ -200,7 +188,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
// create a new recovery status, and process...
final long recoveryId = onGoingRecoveries.start(
new RecoveryTarget(indexShard, sourceNode, listener, segmentReplicationTargetService),
new RecoveryTarget(indexShard, sourceNode, listener),
recoverySettings.activityTimeout()
);
// we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause
Expand Down Expand Up @@ -575,20 +563,6 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
}
}

class ForceSyncTransportRequestHandler implements TransportRequestHandler<ForceSyncRequest> {
@Override
public void messageReceived(final ForceSyncRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.getRecoveryId(), request.getShardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FORCE_SYNC, request);
if (listener == null) {
return;
}
recoveryTarget.forceSegmentFileSync(listener);
}
}
}

class RecoveryRunner extends AbstractRunnable {

final long recoveryId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,6 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
final StepListener<Void> finalizeListener = new StepListener<>();
cancellableThreads.checkForCancel();
recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener);

finalizeListener.whenComplete(r -> {
RunUnderPrimaryPermit.run(
() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

package org.opensearch.indices.recovery;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
Expand All @@ -57,9 +56,6 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
Expand All @@ -71,7 +67,6 @@
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;

import static org.opensearch.index.translog.Translog.TRANSLOG_UUID_KEY;

Expand All @@ -91,44 +86,19 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH
// latch that can be used to blockingly wait for RecoveryTarget to be closed
private final CountDownLatch closedLatch = new CountDownLatch(1);

private final SegmentReplicationTargetService segmentReplicationTargetService;

/**
* Creates a new recovery target object that represents a recovery to the provided shard.
*
* @param indexShard local shard where we want to recover to
* @param sourceNode source node of the recovery where we recover from
* @param listener called when recovery is completed/failed
* @param segmentReplicationTargetService used to force a segment replication round
*/
public RecoveryTarget(
IndexShard indexShard,
DiscoveryNode sourceNode,
ReplicationListener listener,
SegmentReplicationTargetService segmentReplicationTargetService
) {
super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener);
this.sourceNode = sourceNode;
indexShard.recoveryStats().incCurrentAsTarget();
final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + ".";
this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount);
this.segmentReplicationTargetService = segmentReplicationTargetService;
}

/**
* Creates a new recovery target object that represents a recovery to the provided shard. Used for tests.
*
* @param indexShard local shard where we want to recover to
* @param sourceNode source node of the recovery where we recover from
* @param listener called when recovery is completed/failed
*/
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener) {
super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener);
this.sourceNode = sourceNode;
indexShard.recoveryStats().incCurrentAsTarget();
final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + ".";
this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount);
this.segmentReplicationTargetService = SegmentReplicationTargetService.NO_OP;
}

/**
Expand All @@ -137,7 +107,7 @@ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, Replicati
* @return a copy of this recovery target
*/
public RecoveryTarget retryCopy() {
return new RecoveryTarget(indexShard, sourceNode, listener, segmentReplicationTargetService);
return new RecoveryTarget(indexShard, sourceNode, listener);
}

public IndexShard indexShard() {
Expand Down Expand Up @@ -249,45 +219,7 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Vo

@Override
public void forceSegmentFileSync(ActionListener<Void> listener) {
segmentReplicationTargetService.startReplication(
ReplicationCheckpoint.empty(shardId()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
try {
indexShard.resetEngine();
listener.onResponse(null);
} catch (InterruptedException | TimeoutException | IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
indexShard.failShard("replication failure", e);
}
listener.onFailure(e);
}
}
);
throw new UnsupportedOperationException("Method not supported on target!");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.RemoteSegmentFileChunkWriter;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.transport.EmptyTransportResponseHandler;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponse;
Expand Down Expand Up @@ -195,7 +196,7 @@ public void indexTranslogOperations(
*/
@Override
public void forceSegmentFileSync(ActionListener<Void> listener) {
final String action = PeerRecoveryTargetService.Actions.FORCE_SYNC;
final String action = SegmentReplicationTargetService.Actions.FORCE_SYNC;
final long requestSeqNo = requestSeqNoGenerator.getAndIncrement();
final ForceSyncRequest request = new ForceSyncRequest(requestSeqNo, recoveryId, shardId);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
Expand Down
Loading