Skip to content

[Bug] Java consumer occasionally missing one message of a batched entry #25145

@YinY1

Description

@YinY1

Search before reporting

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

User environment

broker and client are in a same machine use same java

  • OS: Linux version 5.4.241-1-tlinux4-0017.16 (mockbuild@VM-81-152-TS3) (gcc version 8.5.0 20210514 (Tencent 8.5.0-23) (GCC)) SMP Thu Dec 12 21:16:37 CST 2024
  • Broker version: 4.1.1
  • Client version: 4.1.1 (Java)
  • Java version:
    • openjdk version "24.0.2" 2025-07-15
    • OpenJDK Runtime Environment Temurin-24.0.2+12 (build 24.0.2+12)
    • OpenJDK 64-Bit Server VM Temurin-24.0.2+12 (build 24.0.2+12, mixed mode, sharing)

Issue Description

What happened

What I did

Continuously produced a certain amount of messages to a partitioned topic, and consumed those messages with two subscriptions [sub-1, sub-2] individually, and check their consistency. Consistent means a message has been sent and received from all subscriptions, so it will be checked after received and before acknowledged.
options i used (use default if not mentioned)

  • producer
    • batchingMaxMessagess = 1000
  • consumer
    • enableBatchIndexAcknowledgment = true
    • ack mode = Individual
    • acknowledgmentGroupTime = 100ms
    • maxAcknowledgmentGroupSize = 1000

Expected

All messages produced could be received from all subscriptions.
If produced 1000000, I should get

--- Total Acked Messages per Subscription ---
2026-01-14 11:56:07.351 [main] INFO  org.example.Main - Subscription [sub-2]: 1000000 acks
2026-01-14 11:56:07.351 [main] INFO  org.example.Main - Subscription [sub-1]: 1000000 acks 

Happened

Occasionally, missed one message with a random batch index

--- Total Acked Messages per Subscription ---
2026-01-14 11:56:07.351 [main] INFO  org.example.Main - Subscription [sub-2]: 999999 acks
2026-01-14 11:56:07.351 [main] INFO  org.example.Main - Subscription [sub-1]: 1000000 acks 
2026-01-14 11:56:07.351 [main] ERROR org.example.Main - [0:123:0:45] not received from [sub-2]!

Why a bug

If set enableBatchIndexAcknowledgment = true or set acknowledgmentGroupTime = 0,
this will not occur.
And broker has a backlog pointed to these entry, while consumer.receive() could not get that missing message even after calling redeliverUnacknowledgedMessages()

** use go client (not support Batch Index Acknowledgment) will not occur **

Error messages

if enable DEBUG for .apache.pulsar.client.impl.PersistentAcknowledgmentsGroupingTracker, found

Flushing pending acks to broker: last-cumulative-ack: [] -- individual-acks: [] -- individual-batch-index-acks: [(0, 123, {})]

Reproducing the issue

pseudocode:

public class PulsarBatchAckPseudoDemo {

    static Set<MessageId> sentMessageIds = new ConcurrentHashMap.newKeySet();
    static Map<MessageId, Set<String>> receiptTracker = new ConcurrentHashMap<>();
    static Map<String, AtomicLong> ackCounters = Map.of("sub-1", new AtomicLong(0), "sub-2", new AtomicLong(0));

    public static void main(String[] args) {

        // Assume a partitioned topic is already created.
        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build();

        // Run producer and consumers concurrently in background threads.
        startThread(() -> producerTask(client));
        startThread(() -> consumerTask(client, "sub-1"));
        startThread(() -> consumerTask(client, "sub-2"));

        // Let the test run for a while, then stop threads and verify.
        sleep(90_000); // Wait for a generous amount of time.

        // --- Verification ---
        System.out.println("--- Total Acked Messages per Subscription ---");
        System.out.printf("Subscription [sub-1]: %d acks%n", ackCounters.get("sub-1").get());
        System.out.printf("Subscription [sub-2]: %d acks%n", ackCounters.get("sub-2").get());

        // Find and report any message that wasn't received by BOTH subscriptions.
        for (MessageId sentId : sentMessageIds) {
            Set<String> receivedBy = receiptTracker.getOrDefault(sentId, Collections.emptySet());
            if (receivedBy.size() < 2) {
                if (!receivedBy.contains("sub-1")) {
                    System.err.printf("[%s] not received from [sub-1]!%n", sentId);
                }
                if (!receivedBy.contains("sub-2")) {
                    System.err.printf("[%s] not received from [sub-2]!%n", sentId);
                }
            }
        }
    }

    // --- Producer Task Logic ---
    @SneakyThrows
    static void producerTask(PulsarClient client) {
        Producer producer = client.newProducer().topic("partitioned_topic").batchingMaxMessages(1000).create();

        for (int i = 0; i < 1_000_000; i++) {
            // Asynchronously send and register the MessageId upon completion.
            producer.sendAsync("message-payload-" + i).thenAccept(messageId -> {
                sentMessageIds.add(messageId);
                receiptTracker.put(messageId, ConcurrentHashMap.newKeySet());
            });
        }
    }

    // --- Consumer Task Logic ---
    @SneakyThrows
    static void consumerTask(PulsarClient client, String subscriptionName) {
        Consumer consumer = client.newConsumer().topic("partitioned_topic").subscriptionName(subscriptionName)
                .enableBatchIndexAcknowledgement(true).acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS)
                .acknowledgementGroupSize(1000).subscribe();

        while (true) { // Run until the main thread stops it.
            Message<String> message = consumer.receive();

            // Step 1: Mark as received BEFORE acknowledging.
            Set<String> receipts = receiptTracker.get(message.getMessageId());
            if (receipts != null) {
                receipts.add(subscriptionName);
            }

            // Step 2: Increment the counter for the final report.
            ackCounters.get(subscriptionName).incrementAndGet();

            // Step 3: Acknowledge the message individually.
            consumer.acknowledge(message);
        }
    }
}

Additional information

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions