Skip to content

Commit af0eba4

Browse files
[Segment Replication] Use engine codec and replica shard ReplicationCheckpoint for replication events (#7732) (#7769)
* [Segment Replication] Use engine codec and replica shard ReplicationCheckpoint for replication events * Spotless fix * Fix failing unit test * Address review comments * Address review comments rename * Address review comments * Add unit test for ForceSegmentSync transport handler * Add unit test for ForceSegmentSync transport failure --------- (cherry picked from commit adf7e2c) Signed-off-by: Suraj Singh <surajrider@gmail.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 6ea325c commit af0eba4

File tree

6 files changed

+165
-104
lines changed

6 files changed

+165
-104
lines changed

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,9 @@ public ReplicationCheckpoint getCheckpoint() {
5757
return this.checkpoint;
5858
}
5959

60-
public SegmentReplicationTarget(
61-
ReplicationCheckpoint checkpoint,
62-
IndexShard indexShard,
63-
SegmentReplicationSource source,
64-
ReplicationListener listener
65-
) {
60+
public SegmentReplicationTarget(IndexShard indexShard, SegmentReplicationSource source, ReplicationListener listener) {
6661
super("replication_target", indexShard, new ReplicationLuceneIndex(), listener);
67-
this.checkpoint = checkpoint;
62+
this.checkpoint = indexShard.getLatestReplicationCheckpoint();
6863
this.source = source;
6964
this.state = new SegmentReplicationState(
7065
indexShard.routingEntry(),
@@ -101,7 +96,7 @@ public SegmentReplicationState state() {
10196
}
10297

10398
public SegmentReplicationTarget retryCopy() {
104-
return new SegmentReplicationTarget(checkpoint, indexShard, source, listener);
99+
return new SegmentReplicationTarget(indexShard, source, listener);
105100
}
106101

107102
@Override

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

Lines changed: 41 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
228228
}
229229
final Thread thread = Thread.currentThread();
230230
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
231-
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
231+
startReplication(replicaShard, new SegmentReplicationListener() {
232232
@Override
233233
public void onReplicationDone(SegmentReplicationState state) {
234234
logger.trace(
@@ -301,17 +301,8 @@ protected void updateLatestReceivedCheckpoint(ReplicationCheckpoint receivedChec
301301
}
302302
}
303303

304-
public SegmentReplicationTarget startReplication(
305-
final ReplicationCheckpoint checkpoint,
306-
final IndexShard indexShard,
307-
final SegmentReplicationListener listener
308-
) {
309-
final SegmentReplicationTarget target = new SegmentReplicationTarget(
310-
checkpoint,
311-
indexShard,
312-
sourceFactory.get(indexShard),
313-
listener
314-
);
304+
public SegmentReplicationTarget startReplication(final IndexShard indexShard, final SegmentReplicationListener listener) {
305+
final SegmentReplicationTarget target = new SegmentReplicationTarget(indexShard, sourceFactory.get(indexShard), listener);
315306
startReplication(target);
316307
return target;
317308
}
@@ -429,57 +420,49 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha
429420
channel.sendResponse(TransportResponse.Empty.INSTANCE);
430421
return;
431422
}
432-
startReplication(
433-
ReplicationCheckpoint.empty(request.getShardId(), indexShard.getDefaultCodecName()),
434-
indexShard,
435-
new SegmentReplicationTargetService.SegmentReplicationListener() {
436-
@Override
437-
public void onReplicationDone(SegmentReplicationState state) {
438-
logger.trace(
439-
() -> new ParameterizedMessage(
440-
"[shardId {}] [replication id {}] Replication complete to {}, timing data: {}",
441-
indexShard.shardId().getId(),
442-
state.getReplicationId(),
443-
indexShard.getLatestReplicationCheckpoint(),
444-
state.getTimingData()
445-
)
446-
);
447-
try {
448-
// Promote engine type for primary target
449-
if (indexShard.recoveryState().getPrimary() == true) {
450-
indexShard.resetToWriteableEngine();
451-
}
452-
channel.sendResponse(TransportResponse.Empty.INSTANCE);
453-
} catch (InterruptedException | TimeoutException | IOException e) {
454-
throw new RuntimeException(e);
423+
startReplication(indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() {
424+
@Override
425+
public void onReplicationDone(SegmentReplicationState state) {
426+
logger.trace(
427+
() -> new ParameterizedMessage(
428+
"[shardId {}] [replication id {}] Replication complete to {}, timing data: {}",
429+
indexShard.shardId().getId(),
430+
state.getReplicationId(),
431+
indexShard.getLatestReplicationCheckpoint(),
432+
state.getTimingData()
433+
)
434+
);
435+
try {
436+
// Promote engine type for primary target
437+
if (indexShard.recoveryState().getPrimary() == true) {
438+
indexShard.resetToWriteableEngine();
455439
}
440+
channel.sendResponse(TransportResponse.Empty.INSTANCE);
441+
} catch (InterruptedException | TimeoutException | IOException e) {
442+
throw new RuntimeException(e);
456443
}
444+
}
457445

458-
@Override
459-
public void onReplicationFailure(
460-
SegmentReplicationState state,
461-
ReplicationFailedException e,
462-
boolean sendShardFailure
463-
) {
464-
logger.trace(
465-
() -> new ParameterizedMessage(
466-
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
467-
indexShard.shardId().getId(),
468-
state.getReplicationId(),
469-
state.getTimingData()
470-
)
471-
);
472-
if (sendShardFailure == true) {
473-
indexShard.failShard("replication failure", e);
474-
}
475-
try {
476-
channel.sendResponse(e);
477-
} catch (IOException ex) {
478-
throw new RuntimeException(ex);
479-
}
446+
@Override
447+
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
448+
logger.trace(
449+
() -> new ParameterizedMessage(
450+
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
451+
indexShard.shardId().getId(),
452+
state.getReplicationId(),
453+
state.getTimingData()
454+
)
455+
);
456+
if (sendShardFailure == true) {
457+
indexShard.failShard("replication failure", e);
458+
}
459+
try {
460+
channel.sendResponse(e);
461+
} catch (IOException ex) {
462+
throw new RuntimeException(ex);
480463
}
481464
}
482-
);
465+
});
483466
}
484467
}
485468

server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.index.store.StoreFileMetadata;
4141
import org.opensearch.index.translog.SnapshotMatchers;
4242
import org.opensearch.index.translog.Translog;
43+
import org.opensearch.indices.IndicesService;
4344
import org.opensearch.indices.recovery.RecoverySettings;
4445
import org.opensearch.indices.recovery.RecoveryTarget;
4546
import org.opensearch.indices.replication.CheckpointInfoResponse;
@@ -294,7 +295,7 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException {
294295
public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
295296
IndexShard primaryShard = newStartedShard(true);
296297
SegmentReplicationTargetService sut;
297-
sut = prepareForReplication(primaryShard, null);
298+
sut = prepareForReplication(primaryShard, null, mock(TransportService.class), mock(IndicesService.class));
298299
SegmentReplicationTargetService spy = spy(sut);
299300

300301
// Starting a new shard in PrimaryMode and shard routing primary.
@@ -314,7 +315,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
314315
spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard);
315316

316317
// Verify that checkpoint is not processed as shard routing is primary.
317-
verify(spy, times(0)).startReplication(any(), any(), any());
318+
verify(spy, times(0)).startReplication(any(), any());
318319
closeShards(primaryShard);
319320
}
320321

@@ -1027,7 +1028,10 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun
10271028

10281029
private void resolveCheckpointInfoResponseListener(ActionListener<CheckpointInfoResponse> listener, IndexShard primary) {
10291030
try {
1030-
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getDefaultCodecName()), primary);
1031+
final CopyState copyState = new CopyState(
1032+
ReplicationCheckpoint.empty(primary.shardId, primary.getLatestReplicationCheckpoint().getCodec()),
1033+
primary
1034+
);
10311035
listener.onResponse(
10321036
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
10331037
);
@@ -1041,7 +1045,6 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe
10411045
throws InterruptedException {
10421046
CountDownLatch latch = new CountDownLatch(1);
10431047
final SegmentReplicationTarget target = targetService.startReplication(
1044-
ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()),
10451048
replica,
10461049
new SegmentReplicationTargetService.SegmentReplicationListener() {
10471050
@Override

0 commit comments

Comments
 (0)