-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Segment Replication] Fixing flaky test failure happening for testShardAlreadyReplicating() #3943
Changes from 3 commits
683880c
571d49e
f93d316
96dc44f
076f0ab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,8 @@ | |
import org.opensearch.transport.TransportService; | ||
|
||
import java.io.IOException; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.Mockito.mock; | ||
|
@@ -32,6 +34,7 @@ | |
import static org.mockito.Mockito.verify; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.spy; | ||
import static org.mockito.Mockito.eq; | ||
|
||
public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { | ||
|
||
|
@@ -40,6 +43,9 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { | |
private SegmentReplicationSource replicationSource; | ||
private SegmentReplicationTargetService sut; | ||
|
||
private ReplicationCheckpoint cp; | ||
private ReplicationCheckpoint newCheckpoint; | ||
|
||
@Override | ||
public void setUp() throws Exception { | ||
super.setUp(); | ||
|
@@ -54,6 +60,14 @@ public void setUp() throws Exception { | |
when(replicationSourceFactory.get(indexShard)).thenReturn(replicationSource); | ||
|
||
sut = new SegmentReplicationTargetService(threadPool, recoverySettings, transportService, replicationSourceFactory); | ||
cp = indexShard.getLatestReplicationCheckpoint(); | ||
newCheckpoint = new ReplicationCheckpoint( | ||
cp.getShardId(), | ||
cp.getPrimaryTerm(), | ||
cp.getSegmentsGen(), | ||
cp.getSeqNo(), | ||
cp.getSegmentInfosVersion() + 1 | ||
); | ||
} | ||
|
||
@Override | ||
|
@@ -127,22 +141,33 @@ public void testAlreadyOnNewCheckpoint() { | |
verify(spy, times(0)).startReplication(any(), any(), any()); | ||
} | ||
|
||
public void testShardAlreadyReplicating() { | ||
SegmentReplicationTargetService spy = spy(sut); | ||
public void testShardAlreadyReplicating() throws InterruptedException { | ||
SegmentReplicationTargetService serviceSpy = spy(sut); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit - A comment here explaining why we need to spy on the service and what we are asserting would be useful. |
||
// Create a separate target and start it so the shard is already replicating. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment seems out of place now since the replication is not started until multiple lines later. Consider removing it |
||
final SegmentReplicationTarget target = new SegmentReplicationTarget( | ||
checkpoint, | ||
indexShard, | ||
replicationSource, | ||
mock(SegmentReplicationTargetService.SegmentReplicationListener.class) | ||
); | ||
final SegmentReplicationTarget spyTarget = Mockito.spy(target); | ||
spy.startReplication(spyTarget); | ||
final SegmentReplicationTarget targetSpy = Mockito.spy(target); | ||
CountDownLatch latch = new CountDownLatch(1); | ||
doAnswer(invocation -> { | ||
final ActionListener<Void> listener = invocation.getArgument(0); | ||
// a new checkpoint arrives before we've completed. | ||
serviceSpy.onNewCheckpoint(newCheckpoint, indexShard); | ||
listener.onResponse(null); | ||
latch.countDown(); | ||
return null; | ||
}).when(targetSpy).startReplication(any()); | ||
doNothing().when(targetSpy).onDone(); | ||
|
||
// a new checkpoint comes in for the same IndexShard. | ||
spy.onNewCheckpoint(checkpoint, indexShard); | ||
verify(spy, times(0)).startReplication(any(), any(), any()); | ||
spyTarget.markAsDone(); | ||
// start replication of this shard the first time. | ||
serviceSpy.startReplication(targetSpy); | ||
|
||
// wait for the new checkpoint to arrive, before the listener completes. | ||
latch.await(30, TimeUnit.SECONDS); | ||
verify(serviceSpy, times(0)).startReplication(eq(newCheckpoint), eq(indexShard), any()); | ||
} | ||
|
||
public void testNewCheckpointBehindCurrentCheckpoint() { | ||
|
@@ -163,14 +188,6 @@ public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOExc | |
allowShardFailures(); | ||
SegmentReplicationTargetService spy = spy(sut); | ||
IndexShard spyShard = spy(indexShard); | ||
ReplicationCheckpoint cp = indexShard.getLatestReplicationCheckpoint(); | ||
ReplicationCheckpoint newCheckpoint = new ReplicationCheckpoint( | ||
cp.getShardId(), | ||
cp.getPrimaryTerm(), | ||
cp.getSegmentsGen(), | ||
cp.getSeqNo(), | ||
cp.getSegmentInfosVersion() + 1 | ||
); | ||
ArgumentCaptor<SegmentReplicationTargetService.SegmentReplicationListener> captor = ArgumentCaptor.forClass( | ||
SegmentReplicationTargetService.SegmentReplicationListener.class | ||
); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick - more descriptive variable names, please.
cp
is hard to decipher at first glance andnewCheckpoint
is confusing when it's being initialized to be equal tocp