Skip to content

Commit

Permalink
[fix][broker] Fix incorrect unack msk count when dup ack a message (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Aug 21, 2023
1 parent 80a8f8d commit 6e59208
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
List<Position> positionsAcked = new ArrayList<>();
long totalAckCount = 0;
boolean individualAck = false;
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
PositionImpl position;
Expand All @@ -514,14 +515,18 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
individualAck = true;
}

addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);

if (individualAck) {
if (checkCanRemovePendingAcksAndHandle(position, msgId)) {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
}
} else {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
}
positionsAcked.add(position);

checkCanRemovePendingAcksAndHandle(position, msgId);

checkAckValidationError(ack, position);

totalAckCount += ackedCount;
Expand Down Expand Up @@ -683,10 +688,11 @@ private void checkAckValidationError(CommandAck ack, PositionImpl position) {
}
}

private void checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageIdData msgId) {
private boolean checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageIdData msgId) {
if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) {
removePendingAcks(position);
return removePendingAcks(position);
}
return false;
}

private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
Expand Down Expand Up @@ -953,7 +959,7 @@ public int hashCode() {
*
* @param position
*/
private void removePendingAcks(PositionImpl position) {
private boolean removePendingAcks(PositionImpl position) {
Consumer ackOwnedConsumer = null;
if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) == null) {
for (Consumer consumer : subscription.getConsumers()) {
Expand All @@ -974,7 +980,7 @@ private void removePendingAcks(PositionImpl position) {
if (ackedPosition != null) {
if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
// Message was already removed by the other consumer
return;
return false;
}
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
Expand All @@ -988,7 +994,9 @@ private void removePendingAcks(PositionImpl position) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
return true;
}
return false;
}

public ConcurrentLongLongPairHashMap getPendingAcks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1597,4 +1597,30 @@ public void testIsSystemTopicAllowAutoTopicCreationAsync() throws Exception {
assertTrue(brokerService.isAllowAutoTopicCreationAsync(
"persistent://pulsar/system/my-system-topic").get());
}

@Test
public void testDuplicateAcknowledgement() throws Exception {
final String ns = "prop/ns-test";

admin.namespaces().createNamespace(ns, 2);
final String topicName = "persistent://prop/ns-test/duplicated-acknowledgement-test";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("sub-1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(true)
.subscribe();
producer.send("1".getBytes(StandardCharsets.UTF_8));
Message<byte[]> message = consumer1.receive();
consumer1.acknowledge(message);
consumer1.acknowledge(message);
assertEquals(admin.topics().getStats(topicName).getSubscriptions()
.get("sub-1").getUnackedMessages(), 0);
}
}

0 comments on commit 6e59208

Please sign in to comment.