Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Avoid consumers receiving acknowledged messages from compacted topic after reconnection #21187

Merged
merged 15 commits into from
Jan 26, 2024
Merged
Prev Previous commit
Next Next commit
Address comment
  • Loading branch information
coderzc committed Jan 26, 2024
commit 03bb92291db20c5900f89ebe84ebebba178a53f8
Original file line number Diff line number Diff line change
Expand Up @@ -2259,6 +2259,10 @@ public void testAcknowledgeWithReconnection() throws Exception {
consumer.acknowledge(message);
}

Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
5));

// Make consumer reconnect to broker
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
admin.topics().unload(topicName);

Expand All @@ -2272,6 +2276,10 @@ public void testAcknowledgeWithReconnection() throws Exception {
consumer.acknowledge(message);
}

Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
0));

Assert.assertEquals(results, expected);

Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
Expand All @@ -2285,8 +2293,11 @@ public void testAcknowledgeWithReconnection() throws Exception {
Assert.assertEquals(message2.getValue(), "V");
consumer.acknowledge(message2);

PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName);
Assert.assertEquals(internalStats.lastConfirmedEntry, internalStats.cursors.get(subName).markDeletePosition);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName);
Assert.assertEquals(internalStats.lastConfirmedEntry,
internalStats.cursors.get(subName).markDeletePosition);
});

consumer.close();
producer.close();
Expand Down