Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of Segment Replication classes #2468

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoveryIndex;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -555,7 +556,7 @@ public void testReuseInFileBasedPeerRecovery() throws Exception {
final Set<String> files = new HashSet<>();
for (final RecoveryState recoveryState : initialRecoveryReponse.shardRecoveryStates().get("test")) {
if (recoveryState.getTargetNode().getName().equals(replicaNode)) {
for (final RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) {
for (final RecoveryIndex.FileDetail file : recoveryState.getIndex().fileDetails()) {
files.add(file.name());
}
break;
Expand Down Expand Up @@ -615,7 +616,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
long reused = 0;
int filesRecovered = 0;
int filesReused = 0;
for (final RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) {
for (final RecoveryIndex.FileDetail file : recoveryState.getIndex().fileDetails()) {
if (files.contains(file.name()) == false) {
recovered += file.length();
filesRecovered++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount,
return client().admin().indices().prepareStats(name).execute().actionGet();
}

private void validateIndexRecoveryState(RecoveryState.Index indexState) {
private void validateIndexRecoveryState(RecoveryIndex indexState) {
assertThat(indexState.time(), greaterThanOrEqualTo(0L));
assertThat(indexState.recoveredFilesPercent(), greaterThanOrEqualTo(0.0f));
assertThat(indexState.recoveredFilesPercent(), lessThanOrEqualTo(100.0f));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@

package org.opensearch.common.concurrent;

import org.opensearch.common.util.concurrent.RefCounted;

/**
* Decorator class that wraps an object reference with a {@link Runnable} that is
* invoked when {@link #close()} is called. The internal {@link OneWayGate} instance ensures
* that this is invoked only once. See also {@link GatedCloseable}
* Adapter class that enables a {@link RefCounted} implementation to function like an {@link AutoCloseable}.
* The {@link #close()} API invokes {@link RefCounted#decRef()} and ensures idempotency using a {@link OneWayGate}.
*/
public class GatedAutoCloseable<T> implements AutoCloseable {
public class AutoCloseableRefCounted<T extends RefCounted> implements AutoCloseable {

private final T ref;
private final Runnable onClose;
private final OneWayGate gate;

public GatedAutoCloseable(T ref, Runnable onClose) {
public AutoCloseableRefCounted(T ref) {
this.ref = ref;
this.onClose = onClose;
gate = new OneWayGate();
}

Expand All @@ -37,7 +36,7 @@ public T get() {
@Override
public void close() {
if (gate.close()) {
onClose.run();
ref.decRef();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* Decorator class that wraps an object reference with a {@link CheckedRunnable} that is
* invoked when {@link #close()} is called. The internal {@link OneWayGate} instance ensures
* that this is invoked only once. See also {@link GatedAutoCloseable}
* that this is invoked only once. See also {@link AutoCloseableRefCounted}
*/
public class GatedCloseable<T> implements Closeable {

Expand Down
36 changes: 18 additions & 18 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource;
import org.opensearch.indices.replication.copy.ReplicationCheckpoint;
import org.opensearch.indices.replication.copy.ReplicationFailedException;
import org.opensearch.indices.replication.copy.ReplicationState;
import org.opensearch.indices.replication.copy.SegmentReplicationState;
import org.opensearch.indices.replication.copy.TrackShardResponse;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -262,7 +262,7 @@ Runnable getGlobalCheckpointSyncer() {
@Nullable
private volatile RecoveryState recoveryState;

private volatile ReplicationState replicationState;
private volatile SegmentReplicationState segRepState;

private final RecoveryStats recoveryStats = new RecoveryStats();
private final MeanMetric refreshMetric = new MeanMetric();
Expand Down Expand Up @@ -413,7 +413,7 @@ public boolean shouldCache(Query query) {
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher);
this.replicationState = new ReplicationState();
this.segRepState = new SegmentReplicationState();
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3040,7 +3040,7 @@ protected Engine getEngineOrNull() {
public void startRecovery(
RecoveryState recoveryState,
SegmentReplicationReplicaService segmentReplicationReplicaService,
SegmentReplicationReplicaService.ReplicationListener replicationListener,
SegmentReplicationReplicaService.SegmentReplicationListener segRepListener,
PrimaryShardReplicationSource replicationSource,
PeerRecoveryTargetService peerRecoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
Expand Down Expand Up @@ -3082,19 +3082,15 @@ public void startRecovery(
new ActionListener<TrackShardResponse>() {
@Override
public void onResponse(TrackShardResponse unused) {
replicationListener.onReplicationDone(replicationState);
segRepListener.onReplicationDone(segRepState);
recoveryState.getIndex().setFileDetailsComplete();
finalizeRecovery();
postRecovery("Shard setup complete.");
}

@Override
public void onFailure(Exception e) {
replicationListener.onReplicationFailure(
replicationState,
new ReplicationFailedException(indexShard, e),
true
);
segRepListener.onReplicationFailure(segRepState, new ReplicationFailedException(indexShard, e), true);
}
}
);
Expand Down Expand Up @@ -3748,15 +3744,19 @@ public synchronized void onNewCheckpoint(
checkpoint,
this,
source,
new SegmentReplicationReplicaService.ReplicationListener() {
new SegmentReplicationReplicaService.SegmentReplicationListener() {
@Override
public void onReplicationDone(ReplicationState state) {
public void onReplicationDone(SegmentReplicationState state) {
markReplicationComplete();
logger.debug("Replication complete to {}", getLatestReplicationCheckpoint());
}

@Override
public void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
markReplicationComplete();
logger.error("Failure", e);
}
Expand All @@ -3767,20 +3767,20 @@ public void onReplicationFailure(ReplicationState state, ReplicationFailedExcept
}
}

public ReplicationState getReplicationState() {
return this.replicationState;
public SegmentReplicationState getReplicationState() {
return this.segRepState;
}

public void markAsReplicating() {
this.replicationState.setStage(ReplicationState.Stage.ACTIVE);
this.segRepState.setStage(SegmentReplicationState.Stage.ACTIVE);
}

public void markReplicationComplete() {
this.replicationState.setStage(ReplicationState.Stage.INACTIVE);
this.segRepState.setStage(SegmentReplicationState.Stage.INACTIVE);
}

private boolean isReplicating() {
return this.replicationState.getStage() == ReplicationState.Stage.ACTIVE;
return this.segRepState.getStage() == SegmentReplicationState.Stage.ACTIVE;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.recovery.RecoveryIndex;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -177,7 +178,7 @@ void recoverFromLocalShards(
}

void addIndices(
final RecoveryState.Index indexRecoveryStats,
final RecoveryIndex indexRecoveryStats,
final Directory target,
final Sort indexSort,
final Directory[] sources,
Expand Down Expand Up @@ -232,9 +233,9 @@ void addIndices(
* Directory wrapper that records copy process for recovery statistics
*/
static final class StatsDirectoryWrapper extends FilterDirectory {
private final RecoveryState.Index index;
private final RecoveryIndex index;

StatsDirectoryWrapper(Directory in, RecoveryState.Index indexRecoveryStats) {
StatsDirectoryWrapper(Directory in, RecoveryIndex indexRecoveryStats) {
super(in);
this.index = indexRecoveryStats;
}
Expand Down Expand Up @@ -355,7 +356,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
+ "]";

if (logger.isTraceEnabled()) {
RecoveryState.Index index = recoveryState.getIndex();
RecoveryIndex index = recoveryState.getIndex();
StringBuilder sb = new StringBuilder();
sb.append(" index : files [")
.append(index.totalFileCount())
Expand Down Expand Up @@ -472,7 +473,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
writeEmptyRetentionLeasesFile(indexShard);
}
// since we recover from local, just fill the files and size
final RecoveryState.Index index = recoveryState.getIndex();
final RecoveryIndex index = recoveryState.getIndex();
try {
if (si != null) {
addRecoveredFileDetails(si, store, index);
Expand Down Expand Up @@ -510,7 +511,7 @@ private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws
assert indexShard.loadRetentionLeases().leases().isEmpty();
}

private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException {
private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryIndex index) throws IOException {
final Directory directory = store.directory();
for (String name : Lucene.files(si)) {
long length = directory.fileLength(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada
public IndexShard createShard(
final ShardRouting shardRouting,
final SegmentReplicationReplicaService segmentReplicationReplicaService,
final SegmentReplicationReplicaService.ReplicationListener replicationListener,
final SegmentReplicationReplicaService.SegmentReplicationListener segRepListener,
final PrimaryShardReplicationSource replicationSource,
final PeerRecoveryTargetService recoveryTargetService,
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
Expand All @@ -859,7 +859,7 @@ public IndexShard createShard(
indexShard.startRecovery(
recoveryState,
segmentReplicationReplicaService,
replicationListener,
segRepListener,
replicationSource,
recoveryTargetService,
recoveryListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
import org.opensearch.indices.replication.SegmentReplicationReplicaService;
import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource;
import org.opensearch.indices.replication.copy.ReplicationFailedException;
import org.opensearch.indices.replication.copy.ReplicationState;
import org.opensearch.indices.replication.copy.SegmentReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
import org.opensearch.snapshots.SnapshotShardsService;
Expand Down Expand Up @@ -630,10 +630,10 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
indicesService.createShard(
shardRouting,
segmentReplicationReplicaService,
new ReplicationListener(shardRouting, primaryTerm),
new ShardRoutingReplicationListener(shardRouting, primaryTerm),
replicationSource,
recoveryTargetService,
new RecoveryListener(shardRouting, primaryTerm),
new ShardRoutingRecoveryListener(shardRouting, primaryTerm),
repositoriesService,
failedShardHandler,
globalCheckpointSyncer,
Expand Down Expand Up @@ -748,7 +748,7 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(
return sourceNode;
}

private class ReplicationListener implements SegmentReplicationReplicaService.ReplicationListener {
private class ShardRoutingReplicationListener implements SegmentReplicationReplicaService.SegmentReplicationListener {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was me being quick and dirty with the POC - I don't think we need a separate listener type for segrep. I think we could extract PeerRecoveryTargetService.RecoveryListener and use it for segrep with a common exception type. We will still need to perform the same actions that RecoveryListener now performs on success (publishing shardStarted) and failure (failing and removing the shard).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. I'll punt this to a future refactor


/**
* ShardRouting with which the shard was created
Expand All @@ -760,25 +760,25 @@ private class ReplicationListener implements SegmentReplicationReplicaService.Re
*/
private final long primaryTerm;

private ReplicationListener(final ShardRouting shardRouting, final long primaryTerm) {
private ShardRoutingReplicationListener(final ShardRouting shardRouting, final long primaryTerm) {
this.shardRouting = shardRouting;
this.primaryTerm = primaryTerm;
}

@Override
public void onReplicationDone(final ReplicationState state) {
public void onReplicationDone(final SegmentReplicationState state) {
logger.info("Shard setup complete, ready for segment copy.");
shardStateAction.shardStarted(shardRouting, primaryTerm, "after replication", SHARD_STATE_ACTION_LISTENER);
}

@Override
public void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
handleRecoveryFailure(shardRouting, sendShardFailure, e);
logger.error("Shard setup failed", e);
}
}

private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {
private class ShardRoutingRecoveryListener implements PeerRecoveryTargetService.RecoveryListener {

/**
* ShardRouting with which the shard was created
Expand All @@ -790,7 +790,7 @@ private class RecoveryListener implements PeerRecoveryTargetService.RecoveryList
*/
private final long primaryTerm;

private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm) {
private ShardRoutingRecoveryListener(final ShardRouting shardRouting, final long primaryTerm) {
this.shardRouting = shardRouting;
this.primaryTerm = primaryTerm;
}
Expand Down Expand Up @@ -1028,7 +1028,7 @@ U createIndex(IndexMetadata indexMetadata, List<IndexEventListener> builtInIndex
T createShard(
ShardRouting shardRouting,
SegmentReplicationReplicaService replicaService,
SegmentReplicationReplicaService.ReplicationListener replicationListener,
SegmentReplicationReplicaService.SegmentReplicationListener segRepListener,
PrimaryShardReplicationSource replicationSource,
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

public class MultiFileWriter extends AbstractRefCounted implements Releasable {

public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
public MultiFileWriter(Store store, RecoveryIndex indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
super("multi_file_writer");
this.store = store;
this.indexState = indexState;
Expand All @@ -71,7 +71,7 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Logger logger;
private final Store store;
private final RecoveryState.Index indexState;
private final RecoveryIndex indexState;
private final String tempFilePrefix;

private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
Expand Down
Loading