Skip to content

# [BUG] Messages may be lost in broadcast mode when consumer starts #9281

Open
@gogodjzhu

Description

@gogodjzhu

Before Creating the Bug Report

  • I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

  • OS: Ubuntu

RocketMQ version

  • RocketMQ Broker/Namesvr version: 5.3.1
  • RocketMQ Client SDK version: 5.3.1

JDK Version

openjdk version "1.8.0_432"

Describe the Bug

In broadcast consumption mode, when a new consumer starts with ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET, it cannot guarantee to receive all messages produced after the consumer starts. This is because the consumer's pull offset in local is empty, so it will determined by the broker's timestamp when it recognizes the CONSUME_FROM_LAST_OFFSET setting, rather than the actual consumer start time.

Steps to Reproduce

package com.rocketmq;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;

public class RocketmqBroadCastTest {
    public static void main(String[] args)
            throws InterruptedException, MQClientException, MQBrokerException, RemotingException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TopicTest001", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setConsumerGroup("cg_TopicTest001_004"); // NOTE: brand new consumer group, has no offset locally

        int totalCount = 10;

        AtomicInteger receivedCount = new AtomicInteger(0);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                receivedCount.addAndGet(msgs.size());
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");

        // Thread.sleep(3000); // NOTE: if wait a few seconds the consumer to start, then all the messages will be consumed

        sendMsg(totalCount);
        Thread.sleep(5000); // wait for the consumer to consume messages
        consumer.shutdown();

        // NOTE: received count less than total count
        System.out.printf("Received %d messages, expected %d%n", receivedCount.get(), totalCount);
    }

    public static void sendMsg(int count)
            throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < count; i++) {
            Message msg = new Message("TopicTest001",
                    "TagA", "OrderID188",
                    ("msg-" + i).getBytes(StandardCharsets.UTF_8));
            SendResult sendResult = producer.send(msg);
            if (sendResult == null || sendResult.getSendStatus() != SendStatus.SEND_OK) {
                throw new RuntimeException("send msg error");
            }
        }
        producer.shutdown();
        System.out.printf("send msg over");
    }
}

What Did You Expect to See?

When a consumer starts in broadcast mode with CONSUME_FROM_LAST_OFFSET, it should receive all messages produced after its start time.

What Did You See Instead?

Some messages produced immediately after consumer start are lost.

Additional Context

Proposed Solution

Enhance LocalFileOffsetStore.load() to:

  1. Check if local offset exists
  2. If no local offset, fetch initial offset from broker based on ConsumeFromWhere setting
  3. Use this as the starting point for consumption

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions