Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -531,25 +531,28 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
LongAdder totalAckCount = new LongAdder();
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
PositionImpl position;
PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
// acked count at least one
long ackedCount = 0;
long batchSize = getBatchSize(msgId);
long batchSize = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value may be 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not batch message, the batchSize can not be able to 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, If not batch message, the batch size should be 0, right?

if (msgId.hasBatchSize()) {
batchSize = msgId.getBatchSize();
// ack batch messages set ackeCount = batchSize
ackedCount = msgId.getBatchSize();
positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
} else {
// ack no batch message set ackedCount = 1
ackedCount = 1;
positionsAcked.add(new MutablePair<>(position, (int) batchSize));
}
Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
if (msgId.getAckSetsCount() > 0) {
long[] ackSets = new long[msgId.getAckSetsCount()];
for (int j = 0; j < msgId.getAckSetsCount(); j++) {
ackSets[j] = msgId.getAckSetAt(j);
}
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets);
position.setAckSet(ackSets);
ackedCount = getAckedCountForTransactionAck(batchSize, ackSets);
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
ackedCount = batchSize;
}
if (msgId.hasBatchSize()) {
positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
} else {
positionsAcked.add(new MutablePair<>(position, (int) batchSize));
}

addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,52 @@ public void testSendTxnMessageTimeout() throws Exception {
}
}

@Test
public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exception {
final String topic = "persistent://" + NAMESPACE1 + "/testAckWithTransactionReduceUnackCountNotInPendingAcks";
final String subName = "test";
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topic)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS)
.sendTimeout(1, TimeUnit.SECONDS)
.create();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(subName)
.subscribe();

// send 5 messages with one batch
for (int i = 0; i < 5; i++) {
producer.sendAsync((i + "").getBytes(UTF_8));
}

List<MessageId> messageIds = new ArrayList<>();

// receive the batch messages add to a list
for (int i = 0; i < 5; i++) {
messageIds.add(consumer.receive().getMessageId());
}

MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0);


// remove the message from the pendingAcks, in fact redeliver will remove the messageId from the pendingAck
getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
.get().get().getSubscription(subName).getConsumers().get(0).getPendingAcks()
.remove(messageId.ledgerId, messageId.entryId);

Transaction txn = getTxn();
consumer.acknowledgeAsync(messageIds.get(1), txn).get();

// ack one message, the unack count is 4
assertEquals(getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
.get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages(), 4);
}

@Test
public void testSendTxnAckMessageToDLQ() throws Exception {
String topic = NAMESPACE1 + "/testSendTxnAckMessageToDLQ";
Expand Down