Skip to content

Commit

Permalink
Segment Replication - Recover all translog ops when flipping to write…
Browse files Browse the repository at this point in the history
…able engine. (opensearch-project#6352)

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Mar 4, 2023
1 parent 82e7e65 commit 4987ab6
Showing 1 changed file with 146 additions and 38 deletions.
184 changes: 146 additions & 38 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.Assertions;
import org.opensearch.ExceptionsHelper;
Expand All @@ -73,11 +72,13 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.common.Booleans;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.io.stream.BytesStreamOutput;
Expand All @@ -93,6 +94,7 @@
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.AsyncIOProcessor;
import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor;
import org.opensearch.common.util.concurrent.RunOnce;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.set.Sets;
Expand Down Expand Up @@ -205,6 +207,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand All @@ -217,6 +220,7 @@
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;
import static org.opensearch.index.translog.Translog.Durability;

/**
* An OpenSearch index shard
Expand Down Expand Up @@ -319,7 +323,7 @@ Runnable getGlobalCheckpointSyncer() {
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;
private final Store remoteStore;
private final TranslogFactory translogFactory;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -342,7 +346,7 @@ public IndexShard(
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
final TranslogFactory translogFactory,
final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore
) throws IOException {
Expand All @@ -360,7 +364,13 @@ public IndexShard(
this.indexSortSupplier = indexSortSupplier;
this.indexEventListener = indexEventListener;
this.threadPool = threadPool;
this.translogSyncProcessor = createTranslogSyncProcessor(logger, threadPool.getThreadContext(), this::getEngine);
this.translogSyncProcessor = createTranslogSyncProcessor(
logger,
threadPool,
this::getEngine,
indexSettings.isRemoteTranslogStoreEnabled(),
indexSettings.getRemoteTranslogUploadBufferInterval()
);
this.mapperService = mapperService;
this.indexCache = indexCache;
this.internalIndexingStats = new InternalIndexingStats();
Expand Down Expand Up @@ -429,7 +439,7 @@ public boolean shouldCache(Query query) {
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactory = translogFactory;
this.translogFactorySupplier = translogFactorySupplier;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -763,6 +773,14 @@ public void relocated(
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
forceRefreshes.close();

boolean syncTranslog = isRemoteTranslogEnabled() && Durability.ASYNC == indexSettings.getTranslogDurability();
// Since all the index permits are acquired at this point, the translog buffer will not change.
// It is safe to perform sync of translogs now as this will ensure for remote-backed indexes, the
// translogs has been uploaded to the remote store.
if (syncTranslog) {
maybeSync();
}
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
: "in-flight operations in progress while moving shard state to relocated";
Expand Down Expand Up @@ -799,6 +817,16 @@ public void relocated(
}
}

private void maybeSync() {
try {
if (isSyncNeeded()) {
sync();
}
} catch (IOException e) {
logger.warn("failed to sync translog", e);
}
}

private void verifyRelocatingState() {
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
Expand Down Expand Up @@ -1472,22 +1500,53 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
}

/**
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
* Checks if this target shard should start a round of segment replication.
* @return - True if the shard is able to perform segment replication.
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (state().equals(IndexShardState.STARTED) == false) {
logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state()));
public boolean isSegmentReplicationAllowed() {
if (indexSettings.isSegRepEnabled() == false) {
logger.warn("Attempting to perform segment replication when it is not enabled on the index");
return false;
}
if (getReplicationTracker().isPrimaryMode()) {
logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints.");
logger.warn("Shard is in primary mode and cannot perform segment replication as a replica.");
return false;
}
if (this.routingEntry().primary()) {
logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints.");
logger.warn("Shard is marked as primary and cannot perform segment replication as a replica");
return false;
}
if (state().equals(IndexShardState.STARTED) == false
&& (state() == IndexShardState.POST_RECOVERY && shardRouting.state() == ShardRoutingState.INITIALIZING) == false) {
logger.warn(
() -> new ParameterizedMessage(
"Shard is not started or recovering {} {} and cannot perform segment replication as a replica",
state(),
shardRouting.state()
)
);
return false;
}
if (getReplicationEngine().isEmpty()) {
logger.warn(
() -> new ParameterizedMessage(
"Shard does not have the correct engine type to perform segment replication {}.",
getEngine().getClass()
)
);
return false;
}
return true;
}

/**
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (isSegmentReplicationAllowed() == false) {
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
Expand Down Expand Up @@ -1546,6 +1605,19 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
}
}

/**
* Fetch a map of StoreFileMetadata for each segment from the latest SegmentInfos.
* This is used to compute diffs for segment replication.
*
* @return - Map of Segment Filename to its {@link StoreFileMetadata}
* @throws IOException - When there is an error loading metadata from the store.
*/
public Map<String, StoreFileMetadata> getSegmentMetadataMap() throws IOException {
try (final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot()) {
return store.getSegmentMetadataMap(snapshot.get());
}
}

/**
* Fails the shard and marks the shard store as corrupted if
* <code>e</code> is caused by index corruption
Expand Down Expand Up @@ -1655,9 +1727,9 @@ static Engine.Searcher wrapSearcher(
* Used with segment replication during relocation handoff, this method updates current read only engine to global
* checkpoint followed by changing to writeable engine
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
* @throws IOException if communication failed
* @throws InterruptedException if calling thread is interrupted
* @throws TimeoutException if timed out waiting for in-flight operations to finish
*
* @opensearch.internal
*/
Expand Down Expand Up @@ -1713,8 +1785,7 @@ public void close(String reason, boolean flushEngine) throws IOException {
} finally {
// playing safe here and close the engine even if the above succeeds - close can be called multiple times
// Also closing refreshListeners to prevent us from accumulating any more listeners
// Closing remoteStore as a part of IndexShard close. null check is handled by IOUtils
IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions, remoteStore);
IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions);
indexShardOperationPermits.close();
}
}
Expand Down Expand Up @@ -2052,7 +2123,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
};

// Do not load the global checkpoint if this is a remote snapshot index
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings) == false) {
if (indexSettings.isRemoteSnapshot() == false) {
loadGlobalCheckpointToReplicationTracker();
}

Expand Down Expand Up @@ -2111,7 +2182,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
}

private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
final Map<String, String> userData = fetchUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
assert userData.containsKey(MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
Expand All @@ -2126,6 +2197,16 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
return true;
}

private Map<String, String> fetchUserData() throws IOException {
if (indexSettings.isRemoteSnapshot() && indexSettings.getExtendedCompatibilitySnapshotVersion() != null) {
// Inefficient method to support reading old Lucene indexes
return Lucene.readSegmentInfosExtendedCompatibility(store.directory(), indexSettings.getExtendedCompatibilitySnapshotVersion())
.getUserData();
} else {
return SegmentInfos.readLatestCommit(store.directory()).getUserData();
}
}

private void onNewEngine(Engine newEngine) {
assert Thread.holdsLock(engineMutex);
refreshListeners.setCurrentRefreshLocationSupplier(newEngine.translogManager()::getTranslogLastWriteLocation);
Expand Down Expand Up @@ -2893,7 +2974,7 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
assert assertPrimaryMode();
// only sync if there are no operations in flight, or when using async durability
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
final boolean asyncDurability = indexSettings().getTranslogDurability() == Translog.Durability.ASYNC;
final boolean asyncDurability = indexSettings().getTranslogDurability() == Durability.ASYNC;
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint() || asyncDurability) {
final ObjectLongMap<String> globalCheckpoints = getInSyncGlobalCheckpoints();
final long globalCheckpoint = replicationTracker.getGlobalCheckpoint();
Expand Down Expand Up @@ -2986,7 +3067,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
+ routingEntry()
+ "]";
assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint()
|| indexSettings().getTranslogDurability() == Translog.Durability.ASYNC : "local checkpoint ["
|| indexSettings().getTranslogDurability() == Durability.ASYNC : "local checkpoint ["
+ getLocalCheckpoint()
+ "] does not match checkpoint from primary context ["
+ primaryContext
Expand Down Expand Up @@ -3406,7 +3487,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
isReadOnlyReplica,
translogFactory
replicationTracker::isPrimaryMode,
translogFactorySupplier.apply(indexSettings, shardRouting)
);
}

Expand Down Expand Up @@ -3763,21 +3845,41 @@ public List<String> getActiveOperations() {

private static AsyncIOProcessor<Translog.Location> createTranslogSyncProcessor(
Logger logger,
ThreadContext threadContext,
Supplier<Engine> engineSupplier
ThreadPool threadPool,
Supplier<Engine> engineSupplier,
boolean bufferAsyncIoProcessor,
TimeValue bufferInterval
) {
return new AsyncIOProcessor<Translog.Location>(logger, 1024, threadContext) {
ThreadContext threadContext = threadPool.getThreadContext();
CheckedConsumer<List<Tuple<Translog.Location, Consumer<Exception>>>, IOException> writeConsumer = candidates -> {
try {
engineSupplier.get().translogManager().ensureTranslogSynced(candidates.stream().map(Tuple::v1));
} catch (AlreadyClosedException ex) {
// that's fine since we already synced everything on engine close - this also is conform with the methods
// documentation
} catch (IOException ex) { // if this fails we are in deep shit - fail the request
logger.debug("failed to sync translog", ex);
throw ex;
}
};
if (bufferAsyncIoProcessor) {
return new BufferedAsyncIOProcessor<>(logger, 102400, threadContext, threadPool, bufferInterval) {
@Override
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
writeConsumer.accept(candidates);
}

@Override
protected String getBufferProcessThreadPoolName() {
return ThreadPool.Names.TRANSLOG_SYNC;
}
};
}

return new AsyncIOProcessor<>(logger, 1024, threadContext) {
@Override
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
try {
engineSupplier.get().translogManager().ensureTranslogSynced(candidates.stream().map(Tuple::v1));
} catch (AlreadyClosedException ex) {
// that's fine since we already synced everything on engine close - this also is conform with the methods
// documentation
} catch (IOException ex) { // if this fails we are in deep shit - fail the request
logger.debug("failed to sync translog", ex);
throw ex;
}
writeConsumer.accept(candidates);
}
};
}
Expand Down Expand Up @@ -3811,7 +3913,7 @@ public boolean isSyncNeeded() {
/**
* Returns the current translog durability mode
*/
public Translog.Durability getTranslogDurability() {
public Durability getTranslogDurability() {
return indexSettings.getTranslogDurability();
}

Expand Down Expand Up @@ -4209,9 +4311,15 @@ public void close() throws IOException {
// TODO: add a dedicate recovery stats for the reset translog
}
);

// When the new engine is created, translogs are synced from remote store onto local. Since remote store is the source
// of truth for translog, we play all translogs that exists locally. Otherwise, the recoverUpto happens upto global checkpoint.
// We also replay all local translog ops with Segment replication, because on engine swap our local translog may
// hold more ops than the global checkpoint.
long recoverUpto = this.isRemoteTranslogEnabled() || indexSettings().isSegRepEnabled() ? Long.MAX_VALUE : globalCheckpoint;
newEngineReference.get()
.translogManager()
.recoverFromTranslog(translogRunner, newEngineReference.get().getProcessedLocalCheckpoint(), globalCheckpoint);
.recoverFromTranslog(translogRunner, newEngineReference.get().getProcessedLocalCheckpoint(), recoverUpto);
newEngineReference.get().refresh("reset_engine");
synchronized (engineMutex) {
verifyNotClosed();
Expand Down

0 comments on commit 4987ab6

Please sign in to comment.