Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[segment replication]Introducing common Replication interfaces for segment replication and recovery code paths #3234

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.indices.analysis.AnalysisModule;
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.indices.replication.common.ReplicationState.Stage;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.node.NodeClosedException;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -2876,7 +2877,7 @@ protected Engine getEngineOrNull() {
public void startRecovery(
RecoveryState recoveryState,
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Consumer<MappingMetadata> mappingUpdateConsumer,
IndicesService indicesService
Expand Down Expand Up @@ -2909,7 +2910,7 @@ public void startRecovery(
recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
} catch (Exception e) {
failShard("corrupted preexisting index", e);
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
}
break;
case SNAPSHOT:
Expand Down Expand Up @@ -2984,15 +2985,15 @@ public void startRecovery(
private void executeRecovery(
String reason,
RecoveryState recoveryState,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RecoveryListener recoveryListener,
CheckedConsumer<ActionListener<Boolean>, Exception> action
) {
markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> {
if (r) {
recoveryListener.onRecoveryDone(recoveryState);
recoveryListener.onDone(recoveryState);
}
}, e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action));
}, e -> recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.node.Node;
import org.opensearch.plugins.IndexStorePlugin;
Expand Down Expand Up @@ -839,7 +840,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada
public IndexShard createShard(
final ShardRouting shardRouting,
final PeerRecoveryTargetService recoveryTargetService,
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
final RecoveryListener recoveryListener,
final RepositoriesService repositoriesService,
final Consumer<IndexShard.ShardFailure> onShardFailure,
final Consumer<ShardId> globalCheckpointSyncer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.PeerRecoverySourceService;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
Expand Down Expand Up @@ -204,6 +204,18 @@ public IndicesClusterStateService(
this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true);
}

public ShardStateAction getShardStateAction() {
return shardStateAction;
}

public ClusterService getClusterService() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used, please remove

return clusterService;
}

public ActionListener<Void> getShardStateActionListener() {
return SHARD_STATE_ACTION_LISTENER;
}

@Override
protected void doStart() {
// Doesn't make sense to manage shards on non-master and non-data nodes
Expand Down Expand Up @@ -624,7 +636,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
indicesService.createShard(
shardRouting,
recoveryTargetService,
new RecoveryListener(shardRouting, primaryTerm),
new RecoveryListener(shardRouting, primaryTerm, this),
repositoriesService,
failedShardHandler,
globalCheckpointSyncer,
Expand Down Expand Up @@ -739,36 +751,8 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(
return sourceNode;
}

private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {

/**
* ShardRouting with which the shard was created
*/
private final ShardRouting shardRouting;

/**
* Primary term with which the shard was created
*/
private final long primaryTerm;

private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm) {
this.shardRouting = shardRouting;
this.primaryTerm = primaryTerm;
}

@Override
public void onRecoveryDone(final RecoveryState state) {
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}

@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
handleRecoveryFailure(shardRouting, sendShardFailure, e);
}
}

// package-private for testing
synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) {
public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) {
failAndRemoveShard(shardRouting, sendShardFailure, "failed recovery", failure, clusterService.state());
}

Expand Down Expand Up @@ -1004,7 +988,7 @@ U createIndex(IndexMetadata indexMetadata, List<IndexEventListener> builtInIndex
T createShard(
ShardRouting shardRouting,
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RateLimiter;
import org.opensearch.ExceptionsHelper;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.support.ChannelActionListener;
Expand Down Expand Up @@ -210,7 +210,7 @@ protected void retryRecovery(final long recoveryId, final String reason, TimeVal
private void retryRecovery(final long recoveryId, final TimeValue retryAfter, final TimeValue activityTimeout) {
RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout);
if (newTarget != null) {
threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.recoveryId()));
threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.getId()));
}
}

Expand All @@ -237,7 +237,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
final IndexShard indexShard = recoveryTarget.indexShard();
indexShard.preRecovery();
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
logger.trace("{} preparing shard for peer recovery", recoveryTarget.indexShard().shardId());
indexShard.prepareForIndexRecovery();
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
Expand Down Expand Up @@ -288,7 +288,7 @@ public static StartRecoveryRequest getStartRecoveryRequest(
long startingSeqNo
) {
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
logger.trace("{} collecting local files for [{}]", recoveryTarget.indexShard().shardId(), recoveryTarget.sourceNode());

Store.MetadataSnapshot metadataSnapshot;
try {
Expand Down Expand Up @@ -331,31 +331,20 @@ public static StartRecoveryRequest getStartRecoveryRequest(
}
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
}
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
logger.trace("{} local file count [{}]", recoveryTarget.indexShard().shardId(), metadataSnapshot.size());
request = new StartRecoveryRequest(
recoveryTarget.shardId(),
recoveryTarget.indexShard().shardId(),
recoveryTarget.indexShard().routingEntry().allocationId().getId(),
recoveryTarget.sourceNode(),
localNode,
metadataSnapshot,
recoveryTarget.state().getPrimary(),
recoveryTarget.recoveryId(),
recoveryTarget.getId(),
startingSeqNo
);
return request;
}

/**
* The recovery listener
*
* @opensearch.internal
*/
public interface RecoveryListener {
void onRecoveryDone(RecoveryState state);

void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure);
}

class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
Expand Down Expand Up @@ -76,28 +77,23 @@ public RecoveriesCollection(Logger logger, ThreadPool threadPool) {
*
* @return the id of the new recovery.
*/
public long startRecovery(
IndexShard indexShard,
DiscoveryNode sourceNode,
PeerRecoveryTargetService.RecoveryListener listener,
TimeValue activityTimeout
) {
public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener, TimeValue activityTimeout) {
RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener);
startRecoveryInternal(recoveryTarget, activityTimeout);
return recoveryTarget.recoveryId();
return recoveryTarget.getId();
}

private void startRecoveryInternal(RecoveryTarget recoveryTarget, TimeValue activityTimeout) {
RecoveryTarget existingTarget = onGoingRecoveries.putIfAbsent(recoveryTarget.recoveryId(), recoveryTarget);
RecoveryTarget existingTarget = onGoingRecoveries.putIfAbsent(recoveryTarget.getId(), recoveryTarget);
assert existingTarget == null : "found two RecoveryStatus instances with the same id";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert existingTarget == null : "found two RecoveryStatus instances with the same id";
assert existingTarget == null : "found two RecoveryTarget instances with the same id";

logger.trace(
"{} started recovery from {}, id [{}]",
recoveryTarget.shardId(),
recoveryTarget.indexShard().shardId(),
recoveryTarget.sourceNode(),
recoveryTarget.recoveryId()
recoveryTarget.getId()
);
threadPool.schedule(
new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout),
new RecoveryMonitor(recoveryTarget.getId(), recoveryTarget.lastAccessTime(), activityTimeout),
activityTimeout,
ThreadPool.Names.GENERIC
);
Expand Down Expand Up @@ -131,21 +127,21 @@ public RecoveryTarget resetRecovery(final long recoveryId, final TimeValue activ
if (successfulReset) {
logger.trace(
"{} restarted recovery from {}, id [{}], previous id [{}]",
newRecoveryTarget.shardId(),
newRecoveryTarget.indexShard().shardId(),
newRecoveryTarget.sourceNode(),
newRecoveryTarget.recoveryId(),
oldRecoveryTarget.recoveryId()
newRecoveryTarget.getId(),
oldRecoveryTarget.getId()
);
return newRecoveryTarget;
} else {
logger.trace(
"{} recovery could not be reset as it is already cancelled, recovery from {}, id [{}], previous id [{}]",
newRecoveryTarget.shardId(),
newRecoveryTarget.indexShard().shardId(),
newRecoveryTarget.sourceNode(),
newRecoveryTarget.recoveryId(),
oldRecoveryTarget.recoveryId()
newRecoveryTarget.getId(),
oldRecoveryTarget.getId()
);
cancelRecovery(newRecoveryTarget.recoveryId(), "recovery cancelled during reset");
cancelRecovery(newRecoveryTarget.getId(), "recovery cancelled during reset");
return null;
}
} catch (Exception e) {
Expand Down Expand Up @@ -180,7 +176,7 @@ public RecoveryRef getRecoverySafe(long id, ShardId shardId) {
if (recoveryRef == null) {
throw new IndexShardClosedException(shardId);
}
assert recoveryRef.get().shardId().equals(shardId);
assert recoveryRef.get().indexShard().shardId().equals(shardId);
return recoveryRef;
}

Expand All @@ -191,9 +187,9 @@ public boolean cancelRecovery(long id, String reason) {
if (removed != null) {
logger.trace(
"{} canceled recovery from {}, id [{}] (reason [{}])",
removed.shardId(),
removed.indexShard().shardId(),
removed.sourceNode(),
removed.recoveryId(),
removed.getId(),
reason
);
removed.cancel(reason);
Expand All @@ -214,9 +210,9 @@ public void failRecovery(long id, RecoveryFailedException e, boolean sendShardFa
if (removed != null) {
logger.trace(
"{} failing recovery from {}, id [{}]. Send shard failure: [{}]",
removed.shardId(),
removed.indexShard().shardId(),
removed.sourceNode(),
removed.recoveryId(),
removed.getId(),
sendShardFailure
);
removed.fail(e, sendShardFailure);
Expand All @@ -227,7 +223,12 @@ public void failRecovery(long id, RecoveryFailedException e, boolean sendShardFa
public void markRecoveryAsDone(long id) {
RecoveryTarget removed = onGoingRecoveries.remove(id);
if (removed != null) {
logger.trace("{} marking recovery from {} as done, id [{}]", removed.shardId(), removed.sourceNode(), removed.recoveryId());
logger.trace(
"{} marking recovery from {} as done, id [{}]",
removed.indexShard().shardId(),
removed.sourceNode(),
removed.getId()
);
removed.markAsDone();
}
}
Expand All @@ -250,7 +251,7 @@ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
synchronized (onGoingRecoveries) {
for (Iterator<RecoveryTarget> it = onGoingRecoveries.values().iterator(); it.hasNext();) {
RecoveryTarget status = it.next();
if (status.shardId().equals(shardId)) {
if (status.indexShard().shardId().equals(shardId)) {
matchedRecoveries.add(status);
it.remove();
}
Expand All @@ -259,9 +260,9 @@ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
for (RecoveryTarget removed : matchedRecoveries) {
logger.trace(
"{} canceled recovery from {}, id [{}] (reason [{}])",
removed.shardId(),
removed.indexShard().shardId(),
removed.sourceNode(),
removed.recoveryId(),
removed.getId(),
reason
);
removed.cancel(reason);
Expand Down
Loading