Skip to content

Commit

Permalink
[fix][broker] Fix consumer does not abide by the max unacks limitatio…
Browse files Browse the repository at this point in the history
…n for Shared subscription (apache#16670)
  • Loading branch information
codelipenghui authored Jul 20, 2022
1 parent 3d15343 commit 42fe060
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,9 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {

// round-robin dispatch batch size for this consumer
int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
if (c.getMaxUnackedMessages() > 0) {
availablePermits = Math.min(availablePermits, c.getMaxUnackedMessages() - c.getUnackedMessages());
}
if (log.isDebugEnabled() && !c.isWritable()) {
log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {}; "
+ "availablePermits are {}", topic.getName(), name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,46 @@ public void testConsumerBlockingWithUnAckedMessagesMultipleIteration(boolean ack
}
}

@Test(dataProvider = "ackReceiptEnabled")
public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled) throws PulsarClientException {
final int maxUnacks = 10;
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnacks);
final String topic = "persistent://my-property/my-ns/testMaxUnAckMessagesLowerThanPermits";

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic).subscriptionName("sub")
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(ackReceiptEnabled)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false)
.topic(topic)
.create();

final int messages = 1000;
for (int i = 0; i < messages; i++) {
producer.sendAsync("Message - " + i);
}
producer.flush();
List<MessageId> receives = new ArrayList<>();
for (int i = 0; i < maxUnacks; i++) {
Message<String> received = consumer.receive();
log.info("Received message {} with message ID {}", received.getValue(), received.getMessageId());
receives.add(received.getMessageId());
}
assertNull(consumer.receive(3, TimeUnit.SECONDS));
consumer.acknowledge(receives);
for (int i = 0; i < messages - maxUnacks; i++) {
Message<String> received = consumer.receive();
log.info("Received message {} with message ID {}", received.getValue(), received.getMessageId());
consumer.acknowledge(received);
}
}

/**
* Verify: Consumer1 which doesn't send ack will not impact Consumer2 which sends ack for consumed message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,7 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
}

// client should not receive all produced messages and should be blocked due to unack-messages
assertEquals(messages1.size(), receiverQueueSize);
assertEquals(messages1.size(), unAckedMessagesBufferSize);
Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m -> {
return (MessageIdImpl) m.getMessageId();
}).collect(Collectors.toSet());
Expand Down

0 comments on commit 42fe060

Please sign in to comment.