From bdb92d478dc83f7f448dd3e4dc76f845cbc5d58b Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sat, 25 Nov 2023 19:41:27 +0530 Subject: [PATCH] Fix failing test and incorporate PR review feedback Signed-off-by: Ashish Singh --- .../opensearch/index/shard/IndexShard.java | 2 +- .../translog/InternalTranslogManager.java | 4 +- .../index/translog/NoOpTranslogManager.java | 2 +- .../index/translog/RemoteFsTranslog.java | 46 +++++++++---------- .../opensearch/index/translog/Translog.java | 4 +- .../index/translog/TranslogManager.java | 2 +- 6 files changed, 29 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index a70fdaaab014c..1eda442197d79 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -868,7 +868,7 @@ public void relocated( } // Ensure all in-flight remote store translog upload drains, before we perform the performSegRep. - releasablesOnNoHandoff.add(getEngineOrNull().translogManager().drainSyncToStore()); + releasablesOnNoHandoff.add(getEngineOrNull().translogManager().drainSync()); // no shard operation permits are being held here, move state from started to relocated assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index eabf08d85fa7c..7f930eeb5ac4d 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -308,8 +308,8 @@ public void onDelete() { } @Override - public Releasable drainSyncToStore() { - return translog.drainSyncToStore(); + public Releasable drainSync() { + return translog.drainSync(); } @Override diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index 33b64a20e934b..b4aa7865570a6 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -126,7 +126,7 @@ public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolea public void onDelete() {} @Override - public Releasable drainSyncToStore() { + public Releasable drainSync() { return () -> {}; } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 0a4c4e5801cad..eb18cddc9e11c 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog; import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.SetOnce; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; @@ -78,8 +79,8 @@ public class RemoteFsTranslog extends Translog { private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS); // These permits exist to allow any inflight background triggered upload. - private static final int UPLOAD_PERMITS = 1; - private final Semaphore uploadPermits = new Semaphore(UPLOAD_PERMITS); + private static final int SYNC_PERMITS = 1; + private final Semaphore syncPermits = new Semaphore(SYNC_PERMITS); public RemoteFsTranslog( TranslogConfig config, @@ -321,18 +322,13 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc } private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws IOException { - // During primary relocation (primary-primary peer recovery), both the old and the new primary have engine - // created with the RemoteFsTranslog. Both primaries are equipped to upload the translogs. The primary mode check - // below ensures that the real primary only is uploading. Before the primary mode is set as true for the new - // primary, the engine is reset to InternalEngine which also initialises the RemoteFsTranslog which in turns - // downloads all the translogs from remote store and does a flush before the relocation finishes. - if (primaryModeSupplier.getAsBoolean() == false || uploadPermits.tryAcquire(1) == false) { - logger.debug( - "skipped uploading translog for {} {} uploadPermits={}", - primaryTerm, - generation, - uploadPermits.availablePermits() - ); + // During primary relocation, both the old and new primary have engine created with RemoteFsTranslog and having + // ReplicationTracker.primaryMode() as true. However, before we perform the `internal:index/shard/replication/segments_sync` + // action which re-downloads the segments and translog on the new primary. We are ensuring 2 things here - + // 1. Using primaryModeSupplier, we prevent the new primary to do pre-emptive syncs + // 2. Using syncPermits, we prevent syncs at the desired time during primary relocation. + if (primaryModeSupplier.getAsBoolean() == false || syncPermits.tryAcquire(1) == false) { + logger.debug("skipped uploading translog for {} {} uploadPermits={}", primaryTerm, generation, syncPermits.availablePermits()); // NO-OP return true; } @@ -352,7 +348,7 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo) ); } finally { - uploadPermits.release(1); + syncPermits.release(1); } } @@ -436,13 +432,13 @@ protected void setMinSeqNoToKeep(long seqNo) { } @Override - protected Releasable drainSyncToStore() { + protected Releasable drainSync() { try { - if (uploadPermits.tryAcquire(UPLOAD_PERMITS, 1, TimeUnit.MINUTES)) { + if (syncPermits.tryAcquire(SYNC_PERMITS, 1, TimeUnit.MINUTES)) { logger.info("All permits acquired"); return Releasables.releaseOnce(() -> { - uploadPermits.release(UPLOAD_PERMITS); - assert uploadPermits.availablePermits() == UPLOAD_PERMITS : "Available permits is " + uploadPermits.availablePermits(); + syncPermits.release(SYNC_PERMITS); + assert syncPermits.availablePermits() == SYNC_PERMITS : "Available permits is " + syncPermits.availablePermits(); logger.info("All permits released"); }); } else { @@ -458,6 +454,12 @@ public void trimUnreferencedReaders() throws IOException { // clean up local translog files and updates readers super.trimUnreferencedReaders(); + // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote + // store. + if (syncPermits.availablePermits() == 0) { + return; + } + // Since remote generation deletion is async, this ensures that only one generation deletion happens at a time. // Remote generations involves 2 async operations - 1) Delete translog generation files 2) Delete metadata files // We try to acquire 2 permits and if we can not, we return from here itself. @@ -535,11 +537,7 @@ public static void cleanup(Repository repository, ShardId shardId, ThreadPool th } protected void onDelete() { - if (primaryModeSupplier.getAsBoolean() == false) { - logger.trace("skipped delete translog"); - // NO-OP - return; - } + ClusterService.assertClusterOrClusterManagerStateThread(); // clean up all remote translog files translogTransferManager.delete(); } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 6e08d7c4d7bdb..cb64db19c8337 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -1820,8 +1820,8 @@ protected void onDelete() {} /** * Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back. */ - protected Releasable drainSyncToStore() { - return () -> {}; + protected Releasable drainSync() { + return () -> {}; // noop } /** diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index d66f12dc3da81..e1a0b7d1c1293 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -139,7 +139,7 @@ public interface TranslogManager { /** * Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back. */ - Releasable drainSyncToStore(); + Releasable drainSync(); Translog.TranslogGeneration getTranslogGeneration(); }