Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,7 @@ void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
if (ackGroupingTrackerPtr_) {
ackGroupingTrackerPtr_->close();
}
negativeAcksTracker_.close();

ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
Expand Down Expand Up @@ -1219,6 +1220,7 @@ void ConsumerImpl::shutdown() {
if (client) {
client->cleanupConsumer(this);
}
negativeAcksTracker_.close();
cancelTimers();
consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
failPendingReceiveCallback();
Expand Down
1 change: 1 addition & 0 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ class ConsumerImpl : public ConsumerImplBase {
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
FRIEND_TEST(ConsumerTest, testNegativeAcksTrackerClose);
FRIEND_TEST(DeadLetterQueueTest, testAutoSetDLQTopicName);
};

Expand Down
2 changes: 2 additions & 0 deletions lib/NegativeAcksTracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ void NegativeAcksTracker::close() {
boost::system::error_code ec;
timer_->cancel(ec);
}
timer_ = nullptr;
nackedMessages_.clear();
}

void NegativeAcksTracker::setEnabledForTesting(bool enabled) {
Expand Down
4 changes: 4 additions & 0 deletions lib/NegativeAcksTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include <memory>
#include <mutex>

#include "TestUtil.h"

namespace pulsar {

class ConsumerImpl;
Expand Down Expand Up @@ -66,6 +68,8 @@ class NegativeAcksTracker {
ExecutorServicePtr executor_;
DeadlineTimerPtr timer_;
bool enabledForTesting_; // to be able to test deterministically

FRIEND_TEST(ConsumerTest, testNegativeAcksTrackerClose);
};

} // namespace pulsar
30 changes: 30 additions & 0 deletions tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,36 @@ TEST_P(ConsumerSeekTest, testSeekForMessageId) {
producer.close();
}

TEST(ConsumerTest, testNegativeAcksTrackerClose) {
Client client(lookupUrl);
auto topicName = "testNegativeAcksTrackerClose";

ConsumerConfiguration consumerConfig;
consumerConfig.setNegativeAckRedeliveryDelayMs(100);
Consumer consumer;
client.subscribe(topicName, "test-sub", consumerConfig, consumer);

Producer producer;
client.createProducer(topicName, producer);

for (int i = 0; i < 10; ++i) {
producer.send(MessageBuilder().setContent(std::to_string(i)).build());
}

Message msg;
PulsarFriend::setNegativeAckEnabled(consumer, false);
for (int i = 0; i < 10; ++i) {
consumer.receive(msg);
consumer.negativeAcknowledge(msg);
}

consumer.close();
auto consumerImplPtr = PulsarFriend::getConsumerImplPtr(consumer);
ASSERT_TRUE(consumerImplPtr->negativeAcksTracker_.nackedMessages_.empty());

client.close();
}

INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false));

} // namespace pulsar