Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(m);
if (exist != messageIdPartitionMap.end()) {
removed = exist->second.erase(m);
messageIdPartitionMap.erase(exist);
}
return removed;
}
Expand All @@ -124,28 +125,27 @@ long UnAckedMessageTrackerEnabled::size() {

void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
std::lock_guard<std::mutex> acquire(lock_);
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end();) {
MessageId msgIdInMap = it->first;
if (msgIdInMap < msgId) {
std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(msgId);
if (exist != messageIdPartitionMap.end()) {
exist->second.erase(msgId);
}
if (msgIdInMap <= msgId) {
it->second.erase(msgIdInMap);
messageIdPartitionMap.erase(it++);
} else {
it++;
}
}
}

// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's message.
void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic) {
std::lock_guard<std::mutex> acquire(lock_);
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end();) {
MessageId msgIdInMap = it->first;
if (msgIdInMap.getTopicName().compare(topic) == 0) {
std::map<MessageId, std::set<MessageId>&>::iterator exist =
messageIdPartitionMap.find(msgIdInMap);
if (exist != messageIdPartitionMap.end()) {
exist->second.erase(msgIdInMap);
}
it->second.erase(msgIdInMap);
messageIdPartitionMap.erase(it++);
} else {
it++;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {

void clear();

private:
protected:
void timeoutHandlerHelper();
bool isEmpty();
long size();
Expand Down
205 changes: 205 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3828,3 +3828,208 @@ TEST(BasicEndToEndTest, testAckGroupingTrackerEnabledCumulativeAck) {
ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
}

class UnAckedMessageTrackerEnabledMock : public UnAckedMessageTrackerEnabled {
public:
UnAckedMessageTrackerEnabledMock(long timeoutMs, const ClientImplPtr client, ConsumerImplBase &consumer)
: UnAckedMessageTrackerEnabled(timeoutMs, timeoutMs, client, consumer) {}
const long getUnAckedMessagesTimeoutMs() { return this->timeoutMs_; }
const long getTickDurationInMs() { return this->tickDurationInMs_; }
bool isEmpty() { return UnAckedMessageTrackerEnabled::isEmpty(); }
long size() { return UnAckedMessageTrackerEnabled::size(); }
}; // class UnAckedMessageTrackerEnabledMock

TEST(BasicEndToEndTest, testtUnAckedMessageTrackerDefaultBehavior) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A typo: testtUnAcked -> testUnAcked

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I need to resubmit the code to fix this problem?How about someone fix it when merge pull request?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a committer could fix it when merge PR. So you needn't any changes. @jiazhai could you help take a look?

Copy link
Member

@sijie sijie Dec 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saosir Can you create a follow-up PR to fix the typo?

ConsumerConfiguration configConsumer;
ASSERT_EQ(configConsumer.getUnAckedMessagesTimeoutMs(), 0);
ASSERT_EQ(configConsumer.getTickDurationInMs(), 1000);

UnAckedMessageTrackerDisabled tracker;
Message msg;
ASSERT_FALSE(tracker.add(msg.getMessageId()));
ASSERT_FALSE(tracker.remove(msg.getMessageId()));
}

TEST(BasicEndToEndTest, testUnAckedMessageTrackerDisabled) {
constexpr auto numMsg = 10;
const std::string topicName =
"testUnAckedMessageTrackerDisabledIndividualAck" + std::to_string(time(nullptr));
const std::string subName = "sub-un-acked-msg-disabled-ind-ack";

// Setup client, producer and consumer.
Client client(lookupUrl);

Producer producer;
ProducerConfiguration configProducer;
configProducer.setBatchingEnabled(false);
ASSERT_EQ(ResultOk, client.createProducer(topicName, configProducer, producer));

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

UnAckedMessageTrackerDisabled tracker;
for (auto count = 0; count < numMsg; ++count) {
Message msg;

ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
ASSERT_FALSE(tracker.add(msg.getMessageId()));

consumer.acknowledge(msg.getMessageId());
ASSERT_FALSE(tracker.remove(msg.getMessageId()));
}
consumer.close();

std::this_thread::sleep_for(std::chrono::seconds(1));
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message: " << msg.getDataAsString();
consumer.close();
client.close();
}

TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledIndividualAck) {
constexpr auto numMsg = 10;
constexpr auto unAckedMessagesTimeoutMs = 1000;
const std::string topicName =
"testUnAckedMessageTrackerEnabledIndividualAck" + std::to_string(time(nullptr));
const std::string subName = "sub-un-acked-msg-enabled-ind-ack";

// Setup client, producer and consumer.
Client client(lookupUrl);
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
auto &consumerImpl0 = PulsarFriend::getConsumerImpl(consumer);

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

std::vector<MessageId> recvMsgId;
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
recvMsgId.emplace_back(msg.getMessageId());
}

auto tracker0 = std::make_shared<UnAckedMessageTrackerEnabledMock>(unAckedMessagesTimeoutMs,
clientImplPtr, consumerImpl0);
ASSERT_EQ(tracker0->getUnAckedMessagesTimeoutMs(), unAckedMessagesTimeoutMs);
ASSERT_EQ(tracker0->getTickDurationInMs(), unAckedMessagesTimeoutMs);

for (auto idx = 0; idx < numMsg; ++idx) {
ASSERT_TRUE(tracker0->add(recvMsgId[idx]));
}
ASSERT_EQ(numMsg, tracker0->size());
ASSERT_FALSE(tracker0->isEmpty());

std::this_thread::sleep_for(std::chrono::seconds(4));
ASSERT_EQ(0, tracker0->size());
ASSERT_TRUE(tracker0->isEmpty());
consumer.close();

ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
auto &consumerImpl1 = PulsarFriend::getConsumerImpl(consumer);
std::set<MessageId> restMsgId(recvMsgId.begin(), recvMsgId.end());
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
ASSERT_EQ(restMsgId.count(msg.getMessageId()), 1);
ASSERT_EQ(ResultOk, consumer.acknowledge(msg));
}

auto tracker1 = std::make_shared<UnAckedMessageTrackerEnabledMock>(unAckedMessagesTimeoutMs,
clientImplPtr, consumerImpl1);
for (auto idx = 0; idx < numMsg; ++idx) {
ASSERT_TRUE(tracker1->add(recvMsgId[idx]));
ASSERT_TRUE(tracker1->remove(recvMsgId[idx]));
}
ASSERT_EQ(0, tracker1->size());
ASSERT_TRUE(tracker1->isEmpty());
consumer.close();

std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
consumer.close();
client.close();
}

TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
constexpr auto numMsg = 10;
constexpr auto unAckedMessagesTimeoutMs = 1000;
const std::string topicName =
"testUnAckedMessageTrackerEnabledCumulativeAck" + std::to_string(time(nullptr));
const std::string subName = "sub-un-acked-msg-enabled-cum-ack";

// Setup client, producer and consumer.
Client client(lookupUrl);
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
auto &consumerImpl0 = PulsarFriend::getConsumerImpl(consumer);

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

std::vector<MessageId> recvMsgId;
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
recvMsgId.emplace_back(msg.getMessageId());
}
auto tracker = std::make_shared<UnAckedMessageTrackerEnabledMock>(unAckedMessagesTimeoutMs, clientImplPtr,
consumerImpl0);
for (auto idx = 0; idx < numMsg; ++idx) {
ASSERT_TRUE(tracker->add(recvMsgId[idx]));
}
ASSERT_EQ(numMsg, tracker->size());
ASSERT_FALSE(tracker->isEmpty());

std::sort(recvMsgId.begin(), recvMsgId.end());

auto targetMsgId = recvMsgId[numMsg / 2];
ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(targetMsgId));
tracker->removeMessagesTill(targetMsgId);
ASSERT_EQ(numMsg - (numMsg / 2 + 1), tracker->size());
ASSERT_FALSE(tracker->isEmpty());

std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_EQ(0, tracker->size());
ASSERT_TRUE(tracker->isEmpty());
consumer.close();

std::this_thread::sleep_for(std::chrono::seconds(1));
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
for (auto count = numMsg / 2 + 1; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
}
Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
consumer.close();
client.close();
}