Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Draft - Create Segrep orchestration class & Target implementation #3409

Closed
wants to merge 11 commits into from
Prev Previous commit
Next Next commit
Code cleanup from PR review.
Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed May 24, 2022
commit 6ddcf34fc0cea060f6ba6938940060bbb995db77
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
import java.io.IOException;

/**
* Request object for fetching Segment Metadata from a {@link ReplicationCheckpoint}
* Request object for fetching Segment Metadata for a {@link ReplicationCheckpoint} from a {@link SegmentReplicationSource}.
*
* @opensearch.internal
*/
public class GetCheckpointInfoRequest extends SegmentReplicationTransportRequest {
public class CheckpointInfoRequest extends SegmentReplicationTransportRequest {

private final ReplicationCheckpoint checkpoint;

public GetCheckpointInfoRequest(StreamInput in) throws IOException {
public CheckpointInfoRequest(StreamInput in) throws IOException {
super(in);
checkpoint = new ReplicationCheckpoint(in);
}

public GetCheckpointInfoRequest(
public CheckpointInfoRequest(
long replicationId,
String targetAllocationId,
DiscoveryNode targetNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

/**
* Response returned from a {@link SegmentReplicationSource} that includes the file metadata, and SegmentInfos
* associated with a particular {@link ReplicationCheckpoint}.
* associated with a particular {@link ReplicationCheckpoint}. The {@link SegmentReplicationSource} may determine that
* the requested {@link ReplicationCheckpoint} is behind and return a different {@link ReplicationCheckpoint} in this response.
*
* @opensearch.internal
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@

/**
* TransportRequest to fetch segments from another Node.
* Request includes a list of {@link StoreFileMetadata} of the files to copy and
* the related {@link ReplicationCheckpoint}.
*
* @opensearch.internal
*/
public class GetFilesRequest extends SegmentReplicationTransportRequest {
public class GetSegmentFilesRequest extends SegmentReplicationTransportRequest {

private final List<StoreFileMetadata> filesToFetch;
private final ReplicationCheckpoint checkpoint;

public GetFilesRequest(
public GetSegmentFilesRequest(
long replicationId,
String targetAllocationId,
DiscoveryNode targetNode,
Expand All @@ -38,7 +40,7 @@ public GetFilesRequest(
this.checkpoint = checkpoint;
}

public GetFilesRequest(StreamInput in) throws IOException {
public GetSegmentFilesRequest(StreamInput in) throws IOException {
super(in);
this.filesToFetch = in.readList(StoreFileMetadata::new);
this.checkpoint = new ReplicationCheckpoint(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
*
* @opensearch.internal
*/
public class GetFilesResponse extends TransportResponse {
public class GetSegmentFilesResponse extends TransportResponse {

public GetFilesResponse() {}
public GetSegmentFilesResponse() {}

public GetFilesResponse(StreamInput streamInput) {
public GetSegmentFilesResponse(StreamInput streamInput) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public PeerReplicationSource(
public void getCheckpointMetadata(long replicationId, ReplicationCheckpoint checkpoint, StepListener<CheckpointInfoResponse> listener) {
final Writeable.Reader<CheckpointInfoResponse> reader = CheckpointInfoResponse::new;
final ActionListener<CheckpointInfoResponse> responseListener = ActionListener.map(listener, r -> r);
GetCheckpointInfoRequest request = new GetCheckpointInfoRequest(replicationId, allocationId, localNode, checkpoint);
CheckpointInfoRequest request = new CheckpointInfoRequest(replicationId, allocationId, localNode, checkpoint);
transportClient.executeRetryableAction(GET_CHECKPOINT_INFO, request, responseListener, reader);
}

Expand All @@ -60,12 +60,12 @@ public void getFiles(
ReplicationCheckpoint checkpoint,
Store store,
List<StoreFileMetadata> filesToFetch,
StepListener<GetFilesResponse> listener
StepListener<GetSegmentFilesResponse> listener
) {
final Writeable.Reader<GetFilesResponse> reader = GetFilesResponse::new;
final ActionListener<GetFilesResponse> responseListener = ActionListener.map(listener, r -> r);
final Writeable.Reader<GetSegmentFilesResponse> reader = GetSegmentFilesResponse::new;
final ActionListener<GetSegmentFilesResponse> responseListener = ActionListener.map(listener, r -> r);

GetFilesRequest request = new GetFilesRequest(replicationId, allocationId, localNode, filesToFetch, checkpoint);
GetSegmentFilesRequest request = new GetSegmentFilesRequest(replicationId, allocationId, localNode, filesToFetch, checkpoint);
transportClient.executeRetryableAction(GET_FILES, request, responseListener, reader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ void getFiles(
ReplicationCheckpoint checkpoint,
Store store,
List<StoreFileMetadata> filesToFetch,
StepListener<GetFilesResponse> listener
StepListener<GetSegmentFilesResponse> listener
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ protected SegmentReplicationTransportRequest(StreamInput in) throws IOException
targetNode = new DiscoveryNode(in);
}

protected SegmentReplicationTransportRequest(long replicationId, String targetAllocationId, DiscoveryNode discoveryNode) {
protected SegmentReplicationTransportRequest(long replicationId, String targetAllocationId, DiscoveryNode targetNode) {
this.replicationId = replicationId;
this.targetAllocationId = targetAllocationId;
this.targetNode = discoveryNode;
this.targetNode = targetNode;
}

@Override
Expand Down