Skip to content

Commit

Permalink
Fix flaky test TopicReaderTest.testMultiReaderIsAbleToSeekWithTimeOnM…
Browse files Browse the repository at this point in the history
…iddleOfTopic (apache#9375)

Fixed: apache#9369
The old way uses an estimated method to generate the seek time stamp, which will make the entry number after that timestamp unstable.
  • Loading branch information
Renkai authored Feb 1, 2021
1 parent 88c9e09 commit f7f9406
Showing 1 changed file with 7 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1382,24 +1382,23 @@ public void testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exceptio
final int halfMessages = numOfMessage / 2;
admin.topics().createPartitionedTopic(topicName, 3);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

long l = System.currentTimeMillis();
long halfTime = 0;
for (int i = 0; i < numOfMessage; i++) {
if (i == numOfMessage / 2) {
halfTime = System.currentTimeMillis();
}
producer.send(String.format("msg num %d", i).getBytes());
}

Assert.assertTrue(halfTime != 0);
Reader<byte[]> reader = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();

int plusTime = (halfMessages + 1) * 100;
reader.seek(l + plusTime);

reader.seek(halfTime);
Set<String> messageSet = Sets.newHashSet();
for (int i = halfMessages + 1; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS);
String receivedMessage = new String(message.getData());
Assert.assertTrue(messageSet.add(receivedMessage), "Received duplicate message " + receivedMessage);
}

reader.close();
producer.close();
}
Expand Down

0 comments on commit f7f9406

Please sign in to comment.