-
Notifications
You must be signed in to change notification settings - Fork 80
Closed
Description
Search before asking
- I searched in the issues and found nothing similar.
Version
master
Minimal reproduce step
Reproducible code:
TEST_P(MessageChunkingTest, testEndToEnd) {
const std::string topic =
"MessageChunkingTest-EndToEnd-" + toString(GetParam()) + std::to_string(time(nullptr));
Consumer consumer;
createConsumer(topic, consumer);
Producer producer;
createProducer(topic, producer);
constexpr int numMessages = 10;
std::vector<MessageId> sendMessageIds;
for (int i = 0; i < numMessages; i++) {
MessageId messageId;
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
auto chunkMsgId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
ASSERT_TRUE(chunkMsgId);
LOG_INFO("Send " << i << " to " << messageId);
sendMessageIds.emplace_back(messageId);
}
Message msg;
std::vector<MessageId> receivedMessageIds;
for (int i = 0; i < numMessages; i++) {
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
LOG_INFO("Receive " << msg.getLength() << " bytes from " << msg.getMessageId());
ASSERT_EQ(msg.getDataAsString(), largeMessage);
ASSERT_EQ(msg.getMessageId().batchIndex(), -1);
ASSERT_EQ(msg.getMessageId().batchSize(), 0);
auto messageId = msg.getMessageId();
auto chunkMsgId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
ASSERT_TRUE(chunkMsgId);
receivedMessageIds.emplace_back(messageId);
consumer.acknowledge(messageId);
}
ASSERT_EQ(receivedMessageIds, sendMessageIds);
ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId());
ASSERT_GT(receivedMessageIds.back().entryId(), numMessages);
// Verify the cache has been cleared
auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
ASSERT_EQ(chunkedMessageCache.size(), 0);
BrokerConsumerStats consumerStats;
waitUntil(
std::chrono::seconds(10),
[&] {
if (consumer.getBrokerConsumerStats(consumerStats) != ResultOk) return false;
LOG_DEBUG(consumerStats);
return consumerStats.getMsgBacklog() == 0;
},
1000);
ASSERT_EQ(consumerStats.getMsgBacklog(), 0); // Msg back log doesn't get cleared
producer.close();
consumer.close();
}What did you expect to see?
The message backlog should be cleared. The above test should be passed.
What did you see instead?
The backlog doesn't get cleared.
Anything else?
The root cause is the consumer doesn't acknowledge all chunk message Ids
Are you willing to submit a PR?
- I'm willing to submit a PR!
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels