Description
Before Creating the Bug Report
-
I found a bug, not just asking a question, which should be created in GitHub Discussions.
-
I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
-
I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
Runtime platform environment
- OS: Ubuntu
RocketMQ version
- RocketMQ Broker/Namesvr version: 5.3.1
- RocketMQ Client SDK version: 5.3.1
JDK Version
openjdk version "1.8.0_432"
Describe the Bug
In broadcast consumption mode, when a new consumer starts with ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
, it cannot guarantee to receive all messages produced after the consumer starts. This is because the consumer's pull offset in local is empty, so it will determined by the broker's timestamp when it recognizes the CONSUME_FROM_LAST_OFFSET
setting, rather than the actual consumer start time.
Steps to Reproduce
package com.rocketmq;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
public class RocketmqBroadCastTest {
public static void main(String[] args)
throws InterruptedException, MQClientException, MQBrokerException, RemotingException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest001", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setConsumerGroup("cg_TopicTest001_004"); // NOTE: brand new consumer group, has no offset locally
int totalCount = 10;
AtomicInteger receivedCount = new AtomicInteger(0);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
receivedCount.addAndGet(msgs.size());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
// Thread.sleep(3000); // NOTE: if wait a few seconds the consumer to start, then all the messages will be consumed
sendMsg(totalCount);
Thread.sleep(5000); // wait for the consumer to consume messages
consumer.shutdown();
// NOTE: received count less than total count
System.out.printf("Received %d messages, expected %d%n", receivedCount.get(), totalCount);
}
public static void sendMsg(int count)
throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < count; i++) {
Message msg = new Message("TopicTest001",
"TagA", "OrderID188",
("msg-" + i).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(msg);
if (sendResult == null || sendResult.getSendStatus() != SendStatus.SEND_OK) {
throw new RuntimeException("send msg error");
}
}
producer.shutdown();
System.out.printf("send msg over");
}
}
What Did You Expect to See?
When a consumer starts in broadcast mode with CONSUME_FROM_LAST_OFFSET, it should receive all messages produced after its start time.
What Did You See Instead?
Some messages produced immediately after consumer start are lost.
Additional Context
Proposed Solution
Enhance LocalFileOffsetStore.load()
to:
- Check if local offset exists
- If no local offset, fetch initial offset from broker based on ConsumeFromWhere setting
- Use this as the starting point for consumption