Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pulsar-client-cpp/lib/AckGroupingTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@ namespace pulsar {
* Default ACK grouping tracker, it actually neither tracks ACK requests nor sends them to brokers.
* It can be directly used by consumers for non-persistent topics.
*/
class AckGroupingTracker {
class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracker> {
public:
AckGroupingTracker() = default;
virtual ~AckGroupingTracker() = default;

/**
* Start tracking the ACK requests.
*/
virtual void start() {}

/**
* Since ACK requests are grouped and delayed, we need to do some best-effort duplicate check to
* discard messages that are being resent after a disconnection and for which the user has
Expand Down Expand Up @@ -102,7 +107,7 @@ class AckGroupingTracker {
const std::set<MessageId>& msgIds);
}; // class AckGroupingTracker

using AckGroupingTrackerScopedPtr = std::unique_ptr<AckGroupingTracker>;
using AckGroupingTrackerPtr = std::shared_ptr<AckGroupingTracker>;

} // namespace pulsar
#endif /* LIB_ACKGROUPINGTRACKER_H_ */
21 changes: 14 additions & 7 deletions pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ namespace pulsar {

DECLARE_LOG_OBJECT();

AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr, HandlerBase& handler,
uint64_t consumerId, long ackGroupingTimeMs,
long ackGroupingMaxSize)
AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr,
const HandlerBasePtr& handlerPtr, uint64_t consumerId,
long ackGroupingTimeMs, long ackGroupingMaxSize)
: AckGroupingTracker(),
handler_(handler),
handlerWeakPtr_(handlerPtr),
consumerId_(consumerId),
nextCumulativeAckMsgId_(MessageId::earliest()),
requireCumulativeAck_(false),
Expand All @@ -51,9 +51,10 @@ AckGroupingTrackerEnabled::AckGroupingTrackerEnabled(ClientImplPtr clientPtr, Ha
mutexTimer_() {
LOG_DEBUG("ACK grouping is enabled, grouping time " << ackGroupingTimeMs << "ms, grouping max size "
<< ackGroupingMaxSize);
this->scheduleTimer();
}

void AckGroupingTrackerEnabled::start() { this->scheduleTimer(); }

bool AckGroupingTrackerEnabled::isDuplicate(const MessageId& msgId) {
{
// Check if the message ID is already ACKed by a previous (or pending) cumulative request.
Expand Down Expand Up @@ -94,7 +95,12 @@ void AckGroupingTrackerEnabled::close() {
}

void AckGroupingTrackerEnabled::flush() {
auto cnx = this->handler_.getCnx().lock();
auto handler = handlerWeakPtr_.lock();
if (!handler) {
LOG_WARN("Reference to the HandlerBase is not valid.");
return;
}
auto cnx = handler->getCnx().lock();
if (cnx == nullptr) {
LOG_DEBUG("Connection is not ready, grouping ACK failed.");
return;
Expand Down Expand Up @@ -143,7 +149,8 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
std::lock_guard<std::mutex> lock(this->mutexTimer_);
this->timer_ = this->executor_->createDeadlineTimer();
this->timer_->expires_from_now(boost::posix_time::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
this->timer_->async_wait([this](const boost::system::error_code& ec) -> void {
auto self = shared_from_this();
this->timer_->async_wait([this, self](const boost::system::error_code& ec) -> void {
if (!ec) {
this->flush();
this->scheduleTimer();
Expand Down
7 changes: 4 additions & 3 deletions pulsar-client-cpp/lib/AckGroupingTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
/**
* Constructing ACK grouping tracker for peresistent topics.
* @param[in] clientPtr pointer to client object.
* @param[in] handler the connection handler.
* @param[in] handlerPtr the shared pointer to connection handler.
* @param[in] consumerId consumer ID that this tracker belongs to.
* @param[in] ackGroupingTimeMs ACK grouping time window in milliseconds.
* @param[in] ackGroupingMaxSize max. number of ACK requests can be grouped.
*/
AckGroupingTrackerEnabled(ClientImplPtr clientPtr, HandlerBase& handler, uint64_t consumerId,
AckGroupingTrackerEnabled(ClientImplPtr clientPtr, const HandlerBasePtr& handlerPtr, uint64_t consumerId,
long ackGroupingTimeMs, long ackGroupingMaxSize);

void start() override;
bool isDuplicate(const MessageId& msgId) override;
void addAcknowledge(const MessageId& msgId) override;
void addAcknowledgeCumulative(const MessageId& msgId) override;
Expand All @@ -62,7 +63,7 @@ class AckGroupingTrackerEnabled : public AckGroupingTracker {
void scheduleTimer();

//! The connection handler.
HandlerBase& handler_;
HandlerBaseWeakPtr handlerWeakPtr_;

//! ID of the consumer that this tracker belongs to.
uint64_t consumerId_;
Expand Down
41 changes: 22 additions & 19 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
brokerConsumerStats_(),
consumerStatsBasePtr_(),
negativeAcksTracker_(client, *this, conf),
ackGroupingTrackerPtr_(std::make_shared<AckGroupingTracker>()),
msgCrypto_(),
readCompacted_(conf.isReadCompacted()),
lastMessageInBroker_(Optional<MessageId>::of(MessageId())) {
Expand Down Expand Up @@ -102,23 +103,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
if (conf.isEncryptionEnabled()) {
msgCrypto_ = std::make_shared<MessageCrypto>(consumerStr_, false);
}

// Initialize ACK grouping tracker.
if (TopicName::get(topic)->isPersistent()) {
// Persistent topic, ACK requests need to be sent to broker.
if (conf.getAckGroupingTimeMs() > 0) {
// Grouping ACK is ENABLED because grouping time value is a positive value.
this->ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
client, *this, this->consumerId_, conf.getAckGroupingTimeMs(), conf.getAckGroupingMaxSize()));
} else {
// Grouping ACK is DISABLED because grouping time value is a non-positive value.
this->ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, this->consumerId_));
}
} else {
// Non-persistent topic, ACK requests do NOT need to be sent to broker.
LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic.");
this->ackGroupingTrackerPtr_.reset(new AckGroupingTracker());
}
}

ConsumerImpl::~ConsumerImpl() {
Expand Down Expand Up @@ -156,7 +140,24 @@ const std::string& ConsumerImpl::getSubscriptionName() const { return originalSu

const std::string& ConsumerImpl::getTopic() const { return topic_; }

void ConsumerImpl::start() { grabCnx(); }
void ConsumerImpl::start() {
HandlerBase::start();

// Initialize ackGroupingTrackerPtr_ here because the shared_from_this() was not initialized until the
// constructor completed.
if (TopicName::get(topic_)->isPersistent()) {
if (config_.getAckGroupingTimeMs() > 0) {
ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
client_.lock(), shared_from_this(), consumerId_, config_.getAckGroupingTimeMs(),
config_.getAckGroupingMaxSize()));
} else {
ackGroupingTrackerPtr_.reset(new AckGroupingTrackerDisabled(*this, consumerId_));
}
} else {
LOG_INFO(getName() << "ACK will NOT be sent to broker for this non-persistent topic.");
}
ackGroupingTrackerPtr_->start();
}

void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Lock lock(mutex_);
Expand Down Expand Up @@ -885,7 +886,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
state_ = Closing;

// Flush pending grouped ACK requests.
this->ackGroupingTrackerPtr_->close();
if (ackGroupingTrackerPtr_) {
ackGroupingTrackerPtr_->close();
}

ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class ConsumerImpl : public ConsumerImplBase,
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;
NegativeAcksTracker negativeAcksTracker_;
AckGroupingTrackerScopedPtr ackGroupingTrackerPtr_;
AckGroupingTrackerPtr ackGroupingTrackerPtr_;

MessageCryptoPtr msgCrypto_;
const bool readCompacted_;
Expand Down
70 changes: 38 additions & 32 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3515,6 +3515,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerSingleAckBehavior) {

// Send ACK.
AckGroupingTrackerMock tracker(false);
tracker.start();
for (auto msgIdx = 0; msgIdx < numMsg; ++msgIdx) {
auto connPtr = connWeakPtr.lock();
ASSERT_NE(connPtr, nullptr);
Expand Down Expand Up @@ -3565,6 +3566,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerMultiAckBehavior) {

// Send ACK.
AckGroupingTrackerMock tracker(false);
tracker.start();
std::set<MessageId> restMsgId(recvMsgId.begin(), recvMsgId.end());
ASSERT_EQ(restMsgId.size(), numMsg);
ASSERT_TRUE(tracker.callDoImmediateAck(connWeakPtr, consumerImpl.getConsumerId(), restMsgId));
Expand Down Expand Up @@ -3671,9 +3673,10 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerDisabledCumulativeAck) {

class AckGroupingTrackerEnabledMock : public AckGroupingTrackerEnabled {
public:
AckGroupingTrackerEnabledMock(ClientImplPtr clientPtr, HandlerBase &handler, uint64_t consumerId,
long ackGroupingTimeMs, long ackGroupingMaxSize)
: AckGroupingTrackerEnabled(clientPtr, handler, consumerId, ackGroupingTimeMs, ackGroupingMaxSize) {}
AckGroupingTrackerEnabledMock(ClientImplPtr clientPtr, const HandlerBasePtr &handlerPtr,
uint64_t consumerId, long ackGroupingTimeMs, long ackGroupingMaxSize)
: AckGroupingTrackerEnabled(clientPtr, handlerPtr, consumerId, ackGroupingTimeMs,
ackGroupingMaxSize) {}
const std::set<MessageId> &getPendingIndividualAcks() { return this->pendingIndividualAcks_; }
const long getAckGroupingTimeMs() { return this->ackGroupingTimeMs_; }
const long getAckGroupingMaxSize() { return this->ackGroupingMaxSize_; }
Expand All @@ -3700,7 +3703,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledIndividualAck) {

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
auto consumerImpl = PulsarFriend::getConsumerImplPtr(consumer);

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Expand All @@ -3715,22 +3718,23 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledIndividualAck) {
recvMsgId.emplace_back(msg.getMessageId());
}

AckGroupingTrackerEnabledMock tracker(clientImplPtr, consumerImpl, consumerImpl.getConsumerId(),
ackGroupingTimeMs, ackGroupingMaxSize);
ASSERT_EQ(tracker.getPendingIndividualAcks().size(), 0);
ASSERT_EQ(tracker.getAckGroupingTimeMs(), ackGroupingTimeMs);
ASSERT_EQ(tracker.getAckGroupingMaxSize(), ackGroupingMaxSize);
auto tracker = std::make_shared<AckGroupingTrackerEnabledMock>(
clientImplPtr, consumerImpl, consumerImpl->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
tracker->start();
ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0);
ASSERT_EQ(tracker->getAckGroupingTimeMs(), ackGroupingTimeMs);
ASSERT_EQ(tracker->getAckGroupingMaxSize(), ackGroupingMaxSize);
for (auto &msgId : recvMsgId) {
ASSERT_FALSE(tracker.isDuplicate(msgId));
tracker.addAcknowledge(msgId);
ASSERT_TRUE(tracker.isDuplicate(msgId));
ASSERT_FALSE(tracker->isDuplicate(msgId));
tracker->addAcknowledge(msgId);
ASSERT_TRUE(tracker->isDuplicate(msgId));
}
ASSERT_EQ(tracker.getPendingIndividualAcks().size(), recvMsgId.size());
ASSERT_EQ(tracker->getPendingIndividualAcks().size(), recvMsgId.size());

std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_EQ(tracker.getPendingIndividualAcks().size(), 0);
ASSERT_EQ(tracker->getPendingIndividualAcks().size(), 0);
for (auto &msgId : recvMsgId) {
ASSERT_FALSE(tracker.isDuplicate(msgId));
ASSERT_FALSE(tracker->isDuplicate(msgId));
}
consumer.close();

Expand Down Expand Up @@ -3759,7 +3763,7 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
auto &consumerImpl0 = PulsarFriend::getConsumerImpl(consumer);
auto consumerImpl0 = PulsarFriend::getConsumerImplPtr(consumer);

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Expand All @@ -3775,32 +3779,33 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
}
std::sort(recvMsgId.begin(), recvMsgId.end());

AckGroupingTrackerEnabledMock tracker0(clientImplPtr, consumerImpl0, consumerImpl0.getConsumerId(),
ackGroupingTimeMs, ackGroupingMaxSize);
ASSERT_EQ(tracker0.getNextCumulativeAckMsgId(), MessageId::earliest());
ASSERT_FALSE(tracker0.requireCumulativeAck());
auto tracker0 = std::make_shared<AckGroupingTrackerEnabledMock>(
clientImplPtr, consumerImpl0, consumerImpl0->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
tracker0->start();
ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), MessageId::earliest());
ASSERT_FALSE(tracker0->requireCumulativeAck());

auto targetMsgId = recvMsgId[numMsg / 2];
for (auto idx = 0; idx <= numMsg / 2; ++idx) {
ASSERT_FALSE(tracker0.isDuplicate(recvMsgId[idx]));
ASSERT_FALSE(tracker0->isDuplicate(recvMsgId[idx]));
}
tracker0.addAcknowledgeCumulative(targetMsgId);
tracker0->addAcknowledgeCumulative(targetMsgId);
for (auto idx = 0; idx <= numMsg / 2; ++idx) {
ASSERT_TRUE(tracker0.isDuplicate(recvMsgId[idx]));
ASSERT_TRUE(tracker0->isDuplicate(recvMsgId[idx]));
}
ASSERT_EQ(tracker0.getNextCumulativeAckMsgId(), targetMsgId);
ASSERT_TRUE(tracker0.requireCumulativeAck());
ASSERT_EQ(tracker0->getNextCumulativeAckMsgId(), targetMsgId);
ASSERT_TRUE(tracker0->requireCumulativeAck());

std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_FALSE(tracker0.requireCumulativeAck());
ASSERT_FALSE(tracker0->requireCumulativeAck());
for (auto idx = 0; idx <= numMsg / 2; ++idx) {
ASSERT_TRUE(tracker0.isDuplicate(recvMsgId[idx]));
ASSERT_TRUE(tracker0->isDuplicate(recvMsgId[idx]));
}
consumer.close();

std::this_thread::sleep_for(std::chrono::seconds(1));
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
auto &consumerImpl1 = PulsarFriend::getConsumerImpl(consumer);
auto consumerImpl1 = PulsarFriend::getConsumerImplPtr(consumer);
std::set<MessageId> restMsgId(recvMsgId.begin() + numMsg / 2 + 1, recvMsgId.end());
for (auto count = numMsg / 2 + 1; count < numMsg; ++count) {
Message msg;
Expand All @@ -3810,10 +3815,11 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message: " << msg.getDataAsString();
AckGroupingTrackerEnabledMock tracker1(clientImplPtr, consumerImpl1, consumerImpl1.getConsumerId(),
ackGroupingTimeMs, ackGroupingMaxSize);
tracker1.addAcknowledgeCumulative(recvMsgId[numMsg - 1]);
tracker1.close();
auto tracker1 = std::make_shared<AckGroupingTrackerEnabledMock>(
clientImplPtr, consumerImpl1, consumerImpl1->getConsumerId(), ackGroupingTimeMs, ackGroupingMaxSize);
tracker1->start();
tracker1->addAcknowledgeCumulative(recvMsgId[numMsg - 1]);
tracker1->close();
consumer.close();

ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/tests/PulsarFriend.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ class PulsarFriend {
return *consumerImpl;
}

static std::shared_ptr<ConsumerImpl> getConsumerImplPtr(Consumer consumer) {
return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
}

static std::shared_ptr<ClientImpl> getClientImplPtr(Client client) { return client.impl_; }

static void setNegativeAckEnabled(Consumer consumer, bool enabled) {
Expand Down