Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>logs/**</exclude>
<exclude>**/circe/**</exclude>
<exclude>pulsar-broker/src/test/resources/authentication/basic/.htpasswd</exclude>
<exclude>pulsar-client-cpp/include/gtest/gtest_prod.h</exclude>
<exclude>pulsar-client-cpp/lib/checksum/int_types.h</exclude>
<exclude>pulsar-client-cpp/lib/checksum/gf2.hpp</exclude>
<exclude>pulsar-client-cpp/lib/checksum/crc32c_sse42.cc</exclude>
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ lib*.so*
.settings/
.pydevproject
.idea/
.vs/
*.cbp
*.ninja*
.clangd/
Expand Down
60 changes: 60 additions & 0 deletions pulsar-client-cpp/include/gtest/gtest_prod.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2006, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//
// Google C++ Testing and Mocking Framework definitions useful in production code.
// GOOGLETEST_CM0003 DO NOT DELETE

#ifndef GTEST_INCLUDE_GTEST_GTEST_PROD_H_
#define GTEST_INCLUDE_GTEST_GTEST_PROD_H_

// When you need to test the private or protected members of a class,
// use the FRIEND_TEST macro to declare your tests as friends of the
// class. For example:
//
// class MyClass {
// private:
// void PrivateMethod();
// FRIEND_TEST(MyClassTest, PrivateMethodWorks);
// };
//
// class MyClassTest : public testing::Test {
// // ...
// };
//
// TEST_F(MyClassTest, PrivateMethodWorks) {
// // Can call MyClass::PrivateMethod() here.
// }
//
// Note: The test class must be in the same namespace as the class being tested.
// For example, putting MyClassTest in an anonymous namespace will not work.

#define FRIEND_TEST(test_case_name, test_name) friend class test_case_name##_##test_name##_Test

#endif // GTEST_INCLUDE_GTEST_GTEST_PROD_H_
15 changes: 11 additions & 4 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ DECLARE_LOG_OBJECT()
ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
const std::string& subscriptionName, const ConsumerConfiguration& conf,
const ExecutorServicePtr listenerExecutor /* = NULL by default */,
bool hasParent /* = false by default */,
const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */,
Commands::SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId)
: HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0))),
Expand All @@ -46,6 +47,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
subscription_(subscriptionName),
originalSubscriptionName_(subscriptionName),
messageListener_(config_.getMessageListener()),
hasParent_(hasParent),
consumerTopicType_(consumerTopicType),
subscriptionMode_(subscriptionMode),
startMessageId_(startMessageId),
Expand Down Expand Up @@ -563,7 +565,6 @@ void ConsumerImpl::internalListener() {
// This will only happen when the connection got reset and we cleared the queue
return;
}
unAckedMessageTrackerPtr_->add(msg.getMessageId());
try {
consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
Expand Down Expand Up @@ -638,7 +639,6 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
lock.unlock();
messageProcessed(msg);
unAckedMessageTrackerPtr_->add(msg.getMessageId());
callback(ResultOk, msg);
} else {
pendingReceives_.push(callback);
Expand Down Expand Up @@ -672,7 +672,6 @@ Result ConsumerImpl::receiveHelper(Message& msg) {

incomingMessages_.pop(msg);
messageProcessed(msg);
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
}

Expand Down Expand Up @@ -702,7 +701,6 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {

if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) {
messageProcessed(msg);
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
} else {
return ResultTimeout;
Expand All @@ -720,6 +718,7 @@ void ConsumerImpl::messageProcessed(Message& msg) {
}

increaseAvailablePermits(currentCnx);
trackMessage(msg);
}

/**
Expand Down Expand Up @@ -1232,4 +1231,12 @@ void ConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
negativeAcksTracker_.setEnabledForTesting(enabled);
}

void ConsumerImpl::trackMessage(const Message& msg) {
if (hasParent_) {
unAckedMessageTrackerPtr_->remove(msg.getMessageId());
} else {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
}
}

} /* namespace pulsar */
9 changes: 7 additions & 2 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ConsumerImpl : public ConsumerImplBase,
public:
ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration&,
const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(),
const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(), bool hasParent = false,
const ConsumerTopicType consumerTopicType = NonPartitioned,
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
Optional<MessageId> startMessageId = Optional<MessageId>::empty());
Expand Down Expand Up @@ -166,6 +166,7 @@ class ConsumerImpl : public ConsumerImplBase,
void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
void failPendingReceiveCallback();
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
void trackMessage(const Message& msg);

Optional<MessageId> clearReceiveQueue();

Expand All @@ -175,6 +176,7 @@ class ConsumerImpl : public ConsumerImplBase,
std::string originalSubscriptionName_;
MessageListener messageListener_;
ExecutorServicePtr listenerExecutor_;
bool hasParent_;
ConsumerTopicType consumerTopicType_;

Commands::SubscriptionMode subscriptionMode_;
Expand All @@ -192,7 +194,7 @@ class ConsumerImpl : public ConsumerImplBase,
bool messageListenerRunning_;
std::mutex messageListenerMutex_;
CompressionCodecProvider compressionCodecProvider_;
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;
NegativeAcksTracker negativeAcksTracker_;
Expand All @@ -218,6 +220,9 @@ class ConsumerImpl : public ConsumerImplBase,
// these two declared friend to access setNegativeAcknowledgeEnabledForTesting
friend class MultiTopicsConsumerImpl;
friend class PartitionedConsumerImpl;

FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
};

} /* namespace pulsar */
Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
if (numPartitions == 0) {
// We don't have to add partition-n suffix
consumer = std::make_shared<ConsumerImpl>(client_, topicName->toString(), subscriptionName_, config,
internalListenerExecutor, NonPartitioned);
internalListenerExecutor, true, NonPartitioned);
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
Expand All @@ -193,7 +193,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
for (int i = 0; i < numPartitions; i++) {
std::string topicPartitionName = topicName->getTopicPartitionName(i);
consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
internalListenerExecutor, Partitioned);
internalListenerExecutor, true, Partitioned);
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
Expand Down
6 changes: 5 additions & 1 deletion pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#ifndef PULSAR_MULTI_TOPICS_CONSUMER_HEADER
#define PULSAR_MULTI_TOPICS_CONSUMER_HEADER
#include "gtest/gtest_prod.h"
#include "ConsumerImpl.h"
#include "ClientImpl.h"
#include "BlockingQueue.h"
Expand Down Expand Up @@ -103,7 +104,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
ExecutorServicePtr listenerExecutor_;
MessageListener messageListener_;
Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_;
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
const std::vector<std::string>& topics_;
std::queue<ReceiveCallback> pendingReceives_;

Expand Down Expand Up @@ -137,7 +138,10 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,

private:
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);

FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
};

typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
} // namespace pulsar
#endif // PULSAR_MULTI_TOPICS_CONSUMER_HEADER
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ ConsumerImplPtr PartitionedConsumerImpl::newInternalConsumer(unsigned int partit

std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
auto consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
internalListenerExecutor_, Partitioned);
internalListenerExecutor_, true, Partitioned);

const auto shared_this = const_cast<PartitionedConsumerImpl*>(this)->shared_from_this();
consumer->getConsumerCreatedFuture().addListener(std::bind(
Expand Down
7 changes: 6 additions & 1 deletion pulsar-client-cpp/lib/PartitionedConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#ifndef PULSAR_PARTITIONED_CONSUMER_HEADER
#define PULSAR_PARTITIONED_CONSUMER_HEADER
#include "gtest/gtest_prod.h"
#include "ConsumerImpl.h"
#include "ClientImpl.h"
#include <vector>
Expand Down Expand Up @@ -118,11 +119,15 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
void failPendingReceiveCallback();
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
Promise<Result, ConsumerImplBaseWeakPtr> partitionedConsumerCreatedPromise_;
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
std::queue<ReceiveCallback> pendingReceives_;
void runPartitionUpdateTask();
void getPartitionMetadata();
void handleGetPartitions(const Result result, const LookupDataResultPtr& lookupDataResult);

friend class PulsarFriend;

FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
};
typedef std::weak_ptr<PartitionedConsumerImpl> PartitionedConsumerImplWeakPtr;
typedef std::shared_ptr<PartitionedConsumerImpl> PartitionedConsumerImplPtr;
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ReaderImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void ReaderImpl::start(const MessageId& startMessageId) {
}

consumer_ = std::make_shared<ConsumerImpl>(
client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), NonPartitioned,
client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), false, NonPartitioned,
Commands::SubscriptionModeNonDurable, Optional<MessageId>::of(startMessageId));
consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
consumer_->getConsumerCreatedFuture().addListener(std::bind(&ReaderImpl::handleConsumerCreated,
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#ifndef LIB_UNACKEDMESSAGETRACKERENABLED_H_
#define LIB_UNACKEDMESSAGETRACKERENABLED_H_
#include "gtest/gtest_prod.h"
#include "lib/UnAckedMessageTrackerInterface.h"

#include <mutex>
Expand Down Expand Up @@ -48,6 +49,9 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
ClientImplPtr client_;
long timeoutMs_;
long tickDurationInMs_;

FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
};
} // namespace pulsar

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ class UnAckedMessageTrackerInterface {
virtual void removeTopicMessage(const std::string& topic) = 0;
};

typedef std::unique_ptr<UnAckedMessageTrackerInterface> UnAckedMessageTrackerScopedPtr;
using UnAckedMessageTrackerPtr = std::shared_ptr<UnAckedMessageTrackerInterface>;
} // namespace pulsar
#endif /* LIB_UNACKEDMESSAGETRACKERINTERFACE_H_ */
5 changes: 3 additions & 2 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3825,7 +3825,7 @@ class UnAckedMessageTrackerEnabledMock : public UnAckedMessageTrackerEnabled {
long size() { return UnAckedMessageTrackerEnabled::size(); }
}; // class UnAckedMessageTrackerEnabledMock

TEST(BasicEndToEndTest, testtUnAckedMessageTrackerDefaultBehavior) {
TEST(BasicEndToEndTest, testUnAckedMessageTrackerDefaultBehavior) {
ConsumerConfiguration configConsumer;
ASSERT_EQ(configConsumer.getUnAckedMessagesTimeoutMs(), 0);
ASSERT_EQ(configConsumer.getTickDurationInMs(), 1000);
Expand Down Expand Up @@ -4002,7 +4002,7 @@ TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
ASSERT_EQ(numMsg - (numMsg / 2 + 1), tracker->size());
ASSERT_FALSE(tracker->isEmpty());

std::this_thread::sleep_for(std::chrono::seconds(2));
std::this_thread::sleep_for(std::chrono::seconds(4));
ASSERT_EQ(0, tracker->size());
ASSERT_TRUE(tracker->isEmpty());
consumer.close();
Expand All @@ -4012,6 +4012,7 @@ TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
for (auto count = numMsg / 2 + 1; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
ASSERT_EQ(ResultOk, consumer.acknowledge(msg.getMessageId()));
}
Message msg;
auto ret = consumer.receive(msg, 1000);
Expand Down
Loading