Skip to content

Commit d372f55

Browse files
committed
[feat] Support partitioned topic reader.
1 parent 9ed6a45 commit d372f55

11 files changed

+232
-67
lines changed

lib/ClientImpl.cc

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -236,15 +236,10 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
236236
return;
237237
}
238238

239-
if (partitionMetadata->getPartitions() > 0) {
240-
LOG_ERROR("Topic reader cannot be created on a partitioned topic: " << topicName->toString());
241-
callback(ResultOperationNotSupported, Reader());
242-
return;
243-
}
244-
245-
ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(), conf,
239+
ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(),
240+
partitionMetadata->getPartitions(), conf,
246241
getListenerExecutorProvider()->get(), callback);
247-
ConsumerImplBasePtr consumer = reader->getConsumer().lock();
242+
ConsumerImplBasePtr consumer = reader->getConsumer();
248243
auto self = shared_from_this();
249244
reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
250245
auto consumer = weakConsumerPtr.lock();

lib/ConsumerImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ class ConsumerImpl : public ConsumerImplBase {
124124
void negativeAcknowledge(const MessageId& msgId) override;
125125
bool isConnected() const override;
126126
uint64_t getNumberOfConnectedConsumer() override;
127+
void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override;
127128

128129
virtual void disconnectConsumer();
129130
Result fetchSingleMessageFromBroker(Message& msg);
@@ -133,7 +134,6 @@ class ConsumerImpl : public ConsumerImplBase {
133134
virtual void redeliverMessages(const std::set<MessageId>& messageIds);
134135

135136
virtual bool isReadCompacted();
136-
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
137137
void beforeConnectionChange(ClientConnection& cnx) override;
138138

139139
protected:

lib/ConsumerImplBase.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "HandlerBase.h"
3030

3131
namespace pulsar {
32+
typedef std::function<void(Result result, bool hasMessageAvailable)> HasMessageAvailableCallback;
3233
class ConsumerImplBase;
3334
using ConsumerImplBaseWeakPtr = std::weak_ptr<ConsumerImplBase>;
3435
class OpBatchReceive {
@@ -76,6 +77,7 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this
7677
virtual uint64_t getNumberOfConnectedConsumer() = 0;
7778
// overrided methods from HandlerBase
7879
virtual const std::string& getName() const override = 0;
80+
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback) = 0;
7981

8082
protected:
8183
// overrided methods from HandlerBase

lib/MultiTopicsConsumerImpl.cc

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,20 @@ using namespace pulsar;
4040
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName,
4141
int numPartitions, const std::string& subscriptionName,
4242
const ConsumerConfiguration& conf,
43-
LookupServicePtr lookupServicePtr)
43+
LookupServicePtr lookupServicePtr,
44+
const Commands::SubscriptionMode subscriptionMode,
45+
boost::optional<MessageId> startMessageId)
4446
: MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf,
45-
lookupServicePtr) {
47+
lookupServicePtr, subscriptionMode, startMessageId) {
4648
topicsPartitions_[topicName->toString()] = numPartitions;
4749
}
4850

4951
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
5052
const std::string& subscriptionName, TopicNamePtr topicName,
5153
const ConsumerConfiguration& conf,
52-
LookupServicePtr lookupServicePtr)
54+
LookupServicePtr lookupServicePtr,
55+
const Commands::SubscriptionMode subscriptionMode,
56+
boost::optional<MessageId> startMessageId)
5357
: ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics",
5458
Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
5559
client->getListenerExecutorProvider()->get()),
@@ -60,7 +64,9 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
6064
messageListener_(conf.getMessageListener()),
6165
lookupServicePtr_(lookupServicePtr),
6266
numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
63-
topics_(topics) {
67+
topics_(topics),
68+
subscriptionMode_(subscriptionMode),
69+
startMessageId_(startMessageId) {
6470
std::stringstream consumerStrStream;
6571
consumerStrStream << "[Muti Topics Consumer: "
6672
<< "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]";
@@ -226,7 +232,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
226232
// We don't have to add partition-n suffix
227233
consumer = std::make_shared<ConsumerImpl>(client, topicName->toString(), subscriptionName_, config,
228234
topicName->isPersistent(), internalListenerExecutor, true,
229-
NonPartitioned);
235+
NonPartitioned, subscriptionMode_, startMessageId_);
230236
consumer->getConsumerCreatedFuture().addListener(std::bind(
231237
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
232238
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
@@ -239,7 +245,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
239245
std::string topicPartitionName = topicName->getTopicPartitionName(i);
240246
consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_, config,
241247
topicName->isPersistent(), internalListenerExecutor,
242-
true, Partitioned);
248+
true, Partitioned, subscriptionMode_, startMessageId_);
243249
consumer->getConsumerCreatedFuture().addListener(std::bind(
244250
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
245251
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
@@ -686,7 +692,12 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
686692
}
687693

688694
void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
689-
callback(ResultOperationNotSupported);
695+
msgId.getTopicName();
696+
auto optConsumer = consumers_.find(msgId.getTopicName());
697+
if (optConsumer) {
698+
unAckedMessageTrackerPtr_->removeMessagesTill(msgId);
699+
optConsumer.value()->acknowledgeCumulativeAsync(msgId, callback);
700+
}
690701
}
691702

692703
void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
@@ -1047,3 +1058,35 @@ void MultiTopicsConsumerImpl::cancelTimers() noexcept {
10471058
partitionsUpdateTimer_->cancel(ec);
10481059
}
10491060
}
1061+
1062+
void MultiTopicsConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
1063+
if (incomingMessagesSize_ > 0) {
1064+
callback(ResultOk, true);
1065+
return;
1066+
}
1067+
1068+
auto hasMessageAvailable = std::make_shared<std::atomic<bool>>();
1069+
auto needCallBack = std::make_shared<std::atomic<int>>(consumers_.size());
1070+
auto self = get_shared_this_ptr();
1071+
1072+
consumers_.forEachValue([self, needCallBack, callback, hasMessageAvailable](ConsumerImplPtr consumer) {
1073+
consumer->hasMessageAvailableAsync(
1074+
[self, needCallBack, callback, hasMessageAvailable](Result result, bool hasMsg) {
1075+
if (result != ResultOk) {
1076+
LOG_ERROR("Filed when acknowledge list: " << result);
1077+
// set needCallBack is -1 to avoid repeated callback.
1078+
needCallBack->store(-1);
1079+
callback(result, false);
1080+
return;
1081+
}
1082+
1083+
if (hasMsg) {
1084+
hasMessageAvailable->store(hasMsg);
1085+
}
1086+
1087+
if (--(*needCallBack) == 0) {
1088+
callback(result, hasMessageAvailable->load() || self->incomingMessagesSize_ > 0);
1089+
}
1090+
});
1091+
});
1092+
}

lib/MultiTopicsConsumerImpl.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <vector>
2626

2727
#include "BlockingQueue.h"
28+
#include "Commands.h"
2829
#include "ConsumerImplBase.h"
2930
#include "Future.h"
3031
#include "Latch.h"
@@ -53,10 +54,15 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
5354
public:
5455
MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions,
5556
const std::string& subscriptionName, const ConsumerConfiguration& conf,
56-
LookupServicePtr lookupServicePtr);
57+
LookupServicePtr lookupServicePtr,
58+
const Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
59+
boost::optional<MessageId> startMessageId = boost::none);
60+
5761
MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
5862
const std::string& subscriptionName, TopicNamePtr topicName,
59-
const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_);
63+
const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_,
64+
const Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
65+
boost::optional<MessageId> startMessageId = boost::none);
6066

6167
~MultiTopicsConsumerImpl();
6268
// overrided methods from ConsumerImplBase
@@ -88,6 +94,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
8894
void negativeAcknowledge(const MessageId& msgId) override;
8995
bool isConnected() const override;
9096
uint64_t getNumberOfConnectedConsumer() override;
97+
void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override;
9198

9299
void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
93100
size_t, BrokerConsumerStatsCallback);
@@ -118,6 +125,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
118125
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
119126
const std::vector<std::string> topics_;
120127
std::queue<ReceiveCallback> pendingReceives_;
128+
const Commands::SubscriptionMode subscriptionMode_;
129+
boost::optional<MessageId> startMessageId_;
121130

122131
/* methods */
123132
void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
@@ -167,6 +176,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
167176

168177
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
169178
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
179+
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
170180
};
171181

172182
typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;

lib/ReaderImpl.cc

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "ConsumerImpl.h"
2424
#include "ExecutorService.h"
2525
#include "GetLastMessageIdResponse.h"
26+
#include "MultiTopicsConsumerImpl.h"
2627
#include "TopicName.h"
2728

2829
namespace pulsar {
@@ -35,9 +36,14 @@ ConsumerConfiguration consumerConfigOfReader;
3536

3637
static ResultCallback emptyCallback;
3738

38-
ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
39-
const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback)
40-
: topic_(topic), client_(client), readerConf_(conf), readerCreatedCallback_(readerCreatedCallback) {}
39+
ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions,
40+
const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor,
41+
ReaderCallback readerCreatedCallback)
42+
: topic_(topic),
43+
partitions_(partitions),
44+
client_(client),
45+
readerConf_(conf),
46+
readerCreatedCallback_(readerCreatedCallback) {}
4147

4248
void ReaderImpl::start(const MessageId& startMessageId,
4349
std::function<void(const ConsumerImplBaseWeakPtr&)> callback) {
@@ -80,10 +86,19 @@ void ReaderImpl::start(const MessageId& startMessageId,
8086
test::consumerConfigOfReader = consumerConf.clone();
8187
}
8288

83-
consumer_ = std::make_shared<ConsumerImpl>(
84-
client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
85-
ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable, startMessageId);
86-
consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
89+
if (partitions_ > 0) {
90+
auto consumerImpl = std::make_shared<MultiTopicsConsumerImpl>(
91+
client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf,
92+
client_.lock()->getLookup(), Commands::SubscriptionModeNonDurable, startMessageId);
93+
consumer_ = consumerImpl;
94+
} else {
95+
auto consumerImpl = std::make_shared<ConsumerImpl>(
96+
client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
97+
ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable,
98+
startMessageId);
99+
consumerImpl->setPartitionIndex(TopicName::getPartitionIndex(topic_));
100+
consumer_ = consumerImpl;
101+
}
87102
auto self = shared_from_this();
88103
consumer_->getConsumerCreatedFuture().addListener(
89104
[this, self, callback](Result result, const ConsumerImplBaseWeakPtr& weakConsumerPtr) {

lib/ReaderImpl.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ extern PULSAR_PUBLIC ConsumerConfiguration consumerConfigOfReader;
5858

5959
class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
6060
public:
61-
ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
62-
const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback);
61+
ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions,
62+
const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor,
63+
ReaderCallback readerCreatedCallback);
6364

6465
void start(const MessageId& startMessageId, std::function<void(const ConsumerImplBaseWeakPtr&)> callback);
6566

@@ -73,7 +74,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
7374

7475
Future<Result, ReaderImplWeakPtr> getReaderCreatedFuture();
7576

76-
ConsumerImplWeakPtr getConsumer() const noexcept { return consumer_; }
77+
ConsumerImplBasePtr getConsumer() const noexcept { return consumer_; }
7778

7879
void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
7980

@@ -90,9 +91,10 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
9091
void acknowledgeIfNecessary(Result result, const Message& msg);
9192

9293
std::string topic_;
94+
int partitions_;
9395
ClientImplWeakPtr client_;
9496
ReaderConfiguration readerConf_;
95-
ConsumerImplPtr consumer_;
97+
ConsumerImplBasePtr consumer_;
9698
ReaderCallback readerCreatedCallback_;
9799
ReaderListener readerListener_;
98100
};

lib/UnAckedMessageTrackerEnabled.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
6464
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
6565
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
6666
FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
67+
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
6768
};
6869
} // namespace pulsar
6970

tests/ConsumerTest.cc

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,53 @@ TEST(ConsumerTest, testConsumerEventWithPartition) {
261261
ASSERT_EQ(0, result.size());
262262
}
263263

264+
TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition) {
265+
Client client(lookupUrl);
266+
267+
const std::string topic = "testAcknowledgeCumulativeWithPartition-" + std::to_string(time(nullptr));
268+
const std::string subName = "sub";
269+
270+
int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions",
271+
std::to_string(2));
272+
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
273+
274+
Consumer consumer;
275+
ConsumerConfiguration consumerConfiguration;
276+
consumerConfiguration.setUnAckedMessagesTimeoutMs(10000);
277+
ASSERT_EQ(ResultOk, client.subscribe(topic, "t-sub", consumerConfiguration, consumer));
278+
279+
Producer producer;
280+
ProducerConfiguration producerConfiguration;
281+
producerConfiguration.setBatchingEnabled(false);
282+
producerConfiguration.setPartitionsRoutingMode(
283+
ProducerConfiguration::PartitionsRoutingMode::RoundRobinDistribution);
284+
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer));
285+
286+
const int numMessages = 100;
287+
for (int i = 0; i < numMessages; ++i) {
288+
Message msg = MessageBuilder().setContent(std::to_string(i)).build();
289+
ASSERT_EQ(ResultOk, producer.send(msg));
290+
}
291+
292+
Message msg;
293+
for (int i = 0; i < numMessages; i++) {
294+
ASSERT_EQ(ResultOk, consumer.receive(msg));
295+
// The last message of each partition topic be ACK
296+
if (i >= numMessages - 2) {
297+
consumer.acknowledgeCumulative(msg.getMessageId());
298+
}
299+
}
300+
ASSERT_EQ(ResultTimeout, consumer.receive(msg, 2000));
301+
302+
// Assert that there is no message in the tracker.
303+
auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
304+
auto tracker =
305+
static_cast<UnAckedMessageTrackerEnabled*>(multiConsumerImpl->unAckedMessageTrackerPtr_.get());
306+
ASSERT_EQ(0, tracker->size());
307+
308+
client.close();
309+
}
310+
264311
TEST(ConsumerTest, consumerNotInitialized) {
265312
Consumer consumer;
266313

tests/PulsarFriend.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class PulsarFriend {
9999
}
100100

101101
static ConsumerImplPtr getConsumer(Reader reader) {
102-
return std::static_pointer_cast<ConsumerImpl>(reader.impl_->getConsumer().lock());
102+
return std::static_pointer_cast<ConsumerImpl>(reader.impl_->getConsumer());
103103
}
104104

105105
static ReaderImplWeakPtr getReaderImplWeakPtr(Reader reader) { return reader.impl_; }

0 commit comments

Comments
 (0)