From 2bfbf4a57a3200590e8c8557f136557030913752 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 7 Jun 2022 22:25:17 -0700 Subject: [PATCH] Make RemoteSegmentFileChunkWriter class final. Signed-off-by: Marc Handalian --- .../OngoingSegmentReplications.java | 20 +++++---------- .../RemoteSegmentFileChunkWriter.java | 10 ++++---- .../OngoingSegmentReplicationsTests.java | 25 +++++++++---------- 3 files changed, 23 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index fe4cbc255ec1e..8a1cf84b5327f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -14,6 +14,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; @@ -91,14 +92,10 @@ void cancelReplication(DiscoveryNode node) { } } - SegmentReplicationSourceHandler createTargetHandler( - DiscoveryNode node, - CopyState copyState, - RemoteSegmentFileChunkWriter segmentFileChunkWriter - ) { + SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) { return new SegmentReplicationSourceHandler( node, - segmentFileChunkWriter, + fileChunkWriter, copyState.getShard().getThreadPool(), copyState, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), @@ -163,18 +160,13 @@ private synchronized void removeCopyState(CopyState copyState) { * local store. It will then build a handler to orchestrate the segment copy that will be stored locally and started on a subsequent request from replicas * with the list of required files. * @param request {@link CheckpointInfoRequest} - * @param segmentSegmentFileChunkWriter {@link RemoteSegmentFileChunkWriter} writer to handle sending files over the transport layer. + * @param fileChunkWriter {@link FileChunkWriter} writer to handle sending files over the transport layer. * @return {@link CopyState} the built CopyState for this replication event. * @throws IOException - When there is an IO error building CopyState. */ - CopyState prepareForReplication(CheckpointInfoRequest request, RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter) - throws IOException { + CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException { final CopyState copyState = getCachedCopyState(request.getCheckpoint()); - final SegmentReplicationSourceHandler handler = createTargetHandler( - request.getTargetNode(), - copyState, - segmentSegmentFileChunkWriter - ); + final SegmentReplicationSourceHandler handler = createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter); nodesToHandlers.putIfAbsent(request.getTargetNode(), handler); return copyState; } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java index f563f7359ab5a..05f1c9d757e5c 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java @@ -31,12 +31,12 @@ * * @opensearch.internal */ -public class RemoteSegmentFileChunkWriter implements FileChunkWriter { +public final class RemoteSegmentFileChunkWriter implements FileChunkWriter { - protected final AtomicLong requestSeqNoGenerator; - protected final RetryableTransportClient retryableTransportClient; - protected final ShardId shardId; - protected final RecoverySettings recoverySettings; + private final AtomicLong requestSeqNoGenerator; + private final RetryableTransportClient retryableTransportClient; + private final ShardId shardId; + private final RecoverySettings recoverySettings; private final long replicationId; private final AtomicLong bytesSinceLastPause = new AtomicLong(); private final TransportRequestOptions fileChunkRequestOptions; diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index df81a91add6e4..9cc74276d2b1a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -19,6 +19,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; @@ -31,10 +32,6 @@ import java.util.List; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyBoolean; -import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.anyLong; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -97,18 +94,14 @@ public void testPrepareAndSendSegments() throws IOException { replicaDiscoveryNode, testCheckpoint ); - final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = mock(RemoteSegmentFileChunkWriter.class); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + listener.onResponse(null); + }; final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); assertEquals(1, replications.size()); assertEquals(1, copyState.refCount()); - doAnswer((invocation -> { - final ActionListener listener = invocation.getArgument(5); - listener.onResponse(null); - return null; - })).when(segmentSegmentFileChunkWriter).writeFileChunk(any(), anyLong(), any(), anyBoolean(), anyInt(), any()); - getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, replica.routingEntry().allocationId().getId(), @@ -145,7 +138,10 @@ public void testCancelReplication() throws IOException { primaryDiscoveryNode, testCheckpoint ); - final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = mock(RemoteSegmentFileChunkWriter.class); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + // this shouldn't be called in this test. + Assert.fail(); + }; final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); assertEquals(1, replications.cachedCopyStateSize()); @@ -164,7 +160,10 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { primaryDiscoveryNode, testCheckpoint ); - final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = mock(RemoteSegmentFileChunkWriter.class); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + // this shouldn't be called in this test. + Assert.fail(); + }; final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, copyState.refCount());