Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
summeriiii committed Sep 19, 2024
1 parent 2830f57 commit 2db9e82
Showing 1 changed file with 6 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,10 @@ public void testNonDurableSubscriptionBackLogAfterTopicUnload() throws Exception
String subName = "test-sub";

admin.topics().createNonPartitionedTopic(topicName);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
Expand All @@ -274,23 +276,14 @@ public void testNonDurableSubscriptionBackLogAfterTopicUnload() throws Exception
String message = "my-message-" + i;
producer.send(message.getBytes());
}
producer.close();

assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 10);

// 2. receive the message
Thread t = new Thread(() -> {
while (true) {
Message<byte[]> msg;
try {
msg = consumer.receive();
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
});
t.start();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = consumer.receive();
consumer.acknowledge(msg);
}

// 3. consumed all messages and the msgBacklog is 0
Awaitility.await().untilAsserted(() ->
Expand Down

0 comments on commit 2db9e82

Please sign in to comment.