-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
broker and client are in a same machine use same java
- OS: Linux version 5.4.241-1-tlinux4-0017.16 (mockbuild@VM-81-152-TS3) (gcc version 8.5.0 20210514 (Tencent 8.5.0-23) (GCC)) SMP Thu Dec 12 21:16:37 CST 2024
- Broker version: 4.1.1
- Client version: 4.1.1 (Java)
- Java version:
- openjdk version "24.0.2" 2025-07-15
- OpenJDK Runtime Environment Temurin-24.0.2+12 (build 24.0.2+12)
- OpenJDK 64-Bit Server VM Temurin-24.0.2+12 (build 24.0.2+12, mixed mode, sharing)
Issue Description
What happened
What I did
Continuously produced a certain amount of messages to a partitioned topic, and consumed those messages with two subscriptions [sub-1, sub-2] individually, and check their consistency. Consistent means a message has been sent and received from all subscriptions, so it will be checked after received and before acknowledged.
options i used (use default if not mentioned)
- producer
- batchingMaxMessagess = 1000
- consumer
- enableBatchIndexAcknowledgment = true
- ack mode = Individual
- acknowledgmentGroupTime = 100ms
- maxAcknowledgmentGroupSize = 1000
Expected
All messages produced could be received from all subscriptions.
If produced 1000000, I should get
--- Total Acked Messages per Subscription ---
2026-01-14 11:56:07.351 [main] INFO org.example.Main - Subscription [sub-2]: 1000000 acks
2026-01-14 11:56:07.351 [main] INFO org.example.Main - Subscription [sub-1]: 1000000 acks
Happened
Occasionally, missed one message with a random batch index
--- Total Acked Messages per Subscription ---
2026-01-14 11:56:07.351 [main] INFO org.example.Main - Subscription [sub-2]: 999999 acks
2026-01-14 11:56:07.351 [main] INFO org.example.Main - Subscription [sub-1]: 1000000 acks
2026-01-14 11:56:07.351 [main] ERROR org.example.Main - [0:123:0:45] not received from [sub-2]!
Why a bug
If set enableBatchIndexAcknowledgment = true or set acknowledgmentGroupTime = 0,
this will not occur.
And broker has a backlog pointed to these entry, while consumer.receive() could not get that missing message even after calling redeliverUnacknowledgedMessages()
** use go client (not support Batch Index Acknowledgment) will not occur **
Error messages
if enable DEBUG for .apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker, found
Flushing pending acks to broker: last-cumulative-ack: [] -- individual-acks: [] -- individual-batch-index-acks: [(0, 123, {})]
Reproducing the issue
pseudocode:
public class PulsarBatchAckPseudoDemo {
static Set<MessageId> sentMessageIds = new ConcurrentHashMap.newKeySet();
static Map<MessageId, Set<String>> receiptTracker = new ConcurrentHashMap<>();
static Map<String, AtomicLong> ackCounters = Map.of("sub-1", new AtomicLong(0), "sub-2", new AtomicLong(0));
public static void main(String[] args) {
// Assume a partitioned topic is already created.
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build();
// Run producer and consumers concurrently in background threads.
startThread(() -> producerTask(client));
startThread(() -> consumerTask(client, "sub-1"));
startThread(() -> consumerTask(client, "sub-2"));
// Let the test run for a while, then stop threads and verify.
sleep(90_000); // Wait for a generous amount of time.
// --- Verification ---
System.out.println("--- Total Acked Messages per Subscription ---");
System.out.printf("Subscription [sub-1]: %d acks%n", ackCounters.get("sub-1").get());
System.out.printf("Subscription [sub-2]: %d acks%n", ackCounters.get("sub-2").get());
// Find and report any message that wasn't received by BOTH subscriptions.
for (MessageId sentId : sentMessageIds) {
Set<String> receivedBy = receiptTracker.getOrDefault(sentId, Collections.emptySet());
if (receivedBy.size() < 2) {
if (!receivedBy.contains("sub-1")) {
System.err.printf("[%s] not received from [sub-1]!%n", sentId);
}
if (!receivedBy.contains("sub-2")) {
System.err.printf("[%s] not received from [sub-2]!%n", sentId);
}
}
}
}
// --- Producer Task Logic ---
@SneakyThrows
static void producerTask(PulsarClient client) {
Producer producer = client.newProducer().topic("partitioned_topic").batchingMaxMessages(1000).create();
for (int i = 0; i < 1_000_000; i++) {
// Asynchronously send and register the MessageId upon completion.
producer.sendAsync("message-payload-" + i).thenAccept(messageId -> {
sentMessageIds.add(messageId);
receiptTracker.put(messageId, ConcurrentHashMap.newKeySet());
});
}
}
// --- Consumer Task Logic ---
@SneakyThrows
static void consumerTask(PulsarClient client, String subscriptionName) {
Consumer consumer = client.newConsumer().topic("partitioned_topic").subscriptionName(subscriptionName)
.enableBatchIndexAcknowledgement(true).acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS)
.acknowledgementGroupSize(1000).subscribe();
while (true) { // Run until the main thread stops it.
Message<String> message = consumer.receive();
// Step 1: Mark as received BEFORE acknowledging.
Set<String> receipts = receiptTracker.get(message.getMessageId());
if (receipts != null) {
receipts.add(subscriptionName);
}
// Step 2: Increment the counter for the final report.
ackCounters.get(subscriptionName).incrementAndGet();
// Step 3: Acknowledge the message individually.
consumer.acknowledge(message);
}
}
}Additional information
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!