Skip to content

Commit

Permalink
Make RemoteSegmentFileChunkWriter class final.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Jun 8, 2022
1 parent 45f8f7e commit 2bfbf4a
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkWriter;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.CopyState;
Expand Down Expand Up @@ -91,14 +92,10 @@ void cancelReplication(DiscoveryNode node) {
}
}

SegmentReplicationSourceHandler createTargetHandler(
DiscoveryNode node,
CopyState copyState,
RemoteSegmentFileChunkWriter segmentFileChunkWriter
) {
SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) {
return new SegmentReplicationSourceHandler(
node,
segmentFileChunkWriter,
fileChunkWriter,
copyState.getShard().getThreadPool(),
copyState,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
Expand Down Expand Up @@ -163,18 +160,13 @@ private synchronized void removeCopyState(CopyState copyState) {
* local store. It will then build a handler to orchestrate the segment copy that will be stored locally and started on a subsequent request from replicas
* with the list of required files.
* @param request {@link CheckpointInfoRequest}
* @param segmentSegmentFileChunkWriter {@link RemoteSegmentFileChunkWriter} writer to handle sending files over the transport layer.
* @param fileChunkWriter {@link FileChunkWriter} writer to handle sending files over the transport layer.
* @return {@link CopyState} the built CopyState for this replication event.
* @throws IOException - When there is an IO error building CopyState.
*/
CopyState prepareForReplication(CheckpointInfoRequest request, RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter)
throws IOException {
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
final SegmentReplicationSourceHandler handler = createTargetHandler(
request.getTargetNode(),
copyState,
segmentSegmentFileChunkWriter
);
final SegmentReplicationSourceHandler handler = createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter);
nodesToHandlers.putIfAbsent(request.getTargetNode(), handler);
return copyState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
*
* @opensearch.internal
*/
public class RemoteSegmentFileChunkWriter implements FileChunkWriter {
public final class RemoteSegmentFileChunkWriter implements FileChunkWriter {

protected final AtomicLong requestSeqNoGenerator;
protected final RetryableTransportClient retryableTransportClient;
protected final ShardId shardId;
protected final RecoverySettings recoverySettings;
private final AtomicLong requestSeqNoGenerator;
private final RetryableTransportClient retryableTransportClient;
private final ShardId shardId;
private final RecoverySettings recoverySettings;
private final long replicationId;
private final AtomicLong bytesSinceLastPause = new AtomicLong();
private final TransportRequestOptions fileChunkRequestOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkWriter;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.CopyState;
Expand All @@ -31,10 +32,6 @@
import java.util.List;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -97,18 +94,14 @@ public void testPrepareAndSendSegments() throws IOException {
replicaDiscoveryNode,
testCheckpoint
);
final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = mock(RemoteSegmentFileChunkWriter.class);
final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> {
listener.onResponse(null);
};
final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter);
assertTrue(replications.isInCopyStateMap(request.getCheckpoint()));
assertEquals(1, replications.size());
assertEquals(1, copyState.refCount());

doAnswer((invocation -> {
final ActionListener<Void> listener = invocation.getArgument(5);
listener.onResponse(null);
return null;
})).when(segmentSegmentFileChunkWriter).writeFileChunk(any(), anyLong(), any(), anyBoolean(), anyInt(), any());

getSegmentFilesRequest = new GetSegmentFilesRequest(
1L,
replica.routingEntry().allocationId().getId(),
Expand Down Expand Up @@ -145,7 +138,10 @@ public void testCancelReplication() throws IOException {
primaryDiscoveryNode,
testCheckpoint
);
final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = mock(RemoteSegmentFileChunkWriter.class);
final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> {
// this shouldn't be called in this test.
Assert.fail();
};
final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter);
assertEquals(1, replications.size());
assertEquals(1, replications.cachedCopyStateSize());
Expand All @@ -164,7 +160,10 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException {
primaryDiscoveryNode,
testCheckpoint
);
final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = mock(RemoteSegmentFileChunkWriter.class);
final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> {
// this shouldn't be called in this test.
Assert.fail();
};

final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter);
assertEquals(1, copyState.refCount());
Expand Down

0 comments on commit 2bfbf4a

Please sign in to comment.