Skip to content

Commit d0d7e39

Browse files
authored
[feat] Support pattern subscribe non-persistent topic. (#207)
* [feat] Support pattern subscribe non-persistent topic. * Fix code reviews.
1 parent cb82141 commit d0d7e39

26 files changed

+352
-108
lines changed

include/pulsar/ConsumerConfiguration.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <pulsar/InitialPosition.h>
2828
#include <pulsar/KeySharedPolicy.h>
2929
#include <pulsar/Message.h>
30+
#include <pulsar/RegexSubscriptionMode.h>
3031
#include <pulsar/Result.h>
3132
#include <pulsar/Schema.h>
3233
#include <pulsar/TypedMessage.h>
@@ -383,6 +384,19 @@ class PULSAR_PUBLIC ConsumerConfiguration {
383384
*/
384385
int getPatternAutoDiscoveryPeriod() const;
385386

387+
/**
388+
* Determines which topics this consumer should be subscribed to - Persistent, Non-Persistent, or
389+
* AllTopics. Only used with pattern subscriptions.
390+
*
391+
* @param regexSubscriptionMode The default value is `PersistentOnly`.
392+
*/
393+
ConsumerConfiguration& setRegexSubscriptionMode(RegexSubscriptionMode regexSubscriptionMode);
394+
395+
/**
396+
* @return the regex subscription mode for the pattern consumer.
397+
*/
398+
RegexSubscriptionMode getRegexSubscriptionMode() const;
399+
386400
/**
387401
* The default value is `InitialPositionLatest`.
388402
*
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#ifndef PULSAR_CPP_REGEX_SUB_MODE_H
20+
#define PULSAR_CPP_REGEX_SUB_MODE_H
21+
22+
namespace pulsar {
23+
enum RegexSubscriptionMode
24+
{
25+
/**
26+
* Only subscribe to persistent topics.
27+
*/
28+
PersistentOnly = 0,
29+
30+
/**
31+
* Only subscribe to non-persistent topics.
32+
*/
33+
NonPersistentOnly = 1,
34+
35+
/**
36+
* Subscribe to both persistent and non-persistent topics.
37+
*/
38+
AllTopics = 2
39+
};
40+
}
41+
42+
#endif // PULSAR_CPP_REGEX_SUB_MODE_H

lib/BinaryProtoLookupService.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ uint64_t BinaryProtoLookupService::newRequestId() {
151151
}
152152

153153
Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespaceAsync(
154-
const NamespaceNamePtr& nsName) {
154+
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) {
155155
NamespaceTopicsPromisePtr promise = std::make_shared<Promise<Result, NamespaceTopicsPtr>>();
156156
if (!nsName) {
157157
promise->setFailed(ResultInvalidTopicName);
@@ -160,7 +160,7 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac
160160
std::string namespaceName = nsName->toString();
161161
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
162162
.addListener(std::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this,
163-
namespaceName, std::placeholders::_1, std::placeholders::_2, promise));
163+
namespaceName, mode, std::placeholders::_1, std::placeholders::_2, promise));
164164
return promise->getFuture();
165165
}
166166

@@ -201,7 +201,9 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName
201201
});
202202
}
203203

204-
void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result,
204+
void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName,
205+
CommandGetTopicsOfNamespace_Mode mode,
206+
Result result,
205207
const ClientConnectionWeakPtr& clientCnx,
206208
NamespaceTopicsPromisePtr promise) {
207209
if (result != ResultOk) {
@@ -212,8 +214,7 @@ void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string
212214
ClientConnectionPtr conn = clientCnx.lock();
213215
uint64_t requestId = newRequestId();
214216
LOG_DEBUG("sendGetTopicsOfNamespaceRequest. requestId: " << requestId << " nsName: " << nsName);
215-
216-
conn->newGetTopicsOfNamespace(nsName, requestId)
217+
conn->newGetTopicsOfNamespace(nsName, mode, requestId)
217218
.addListener(std::bind(&BinaryProtoLookupService::getTopicsOfNamespaceListener, this,
218219
std::placeholders::_1, std::placeholders::_2, promise));
219220
}

lib/BinaryProtoLookupService.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
4949

5050
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override;
5151

52-
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override;
52+
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
53+
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override;
5354

5455
Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) override;
5556

@@ -75,8 +76,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
7576
const ClientConnectionWeakPtr& clientCnx,
7677
LookupDataResultPromisePtr promise);
7778

78-
void sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result,
79-
const ClientConnectionWeakPtr& clientCnx,
79+
void sendGetTopicsOfNamespaceRequest(const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode,
80+
Result result, const ClientConnectionWeakPtr& clientCnx,
8081
NamespaceTopicsPromisePtr promise);
8182

8283
void sendGetSchemaRequest(const std::string& topiName, Result result,

lib/ClientConnection.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,8 +1293,8 @@ Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(u
12931293
return promise.getFuture();
12941294
}
12951295

1296-
Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(const std::string& nsName,
1297-
uint64_t requestId) {
1296+
Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
1297+
const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) {
12981298
Lock lock(mutex_);
12991299
Promise<Result, NamespaceTopicsPtr> promise;
13001300
if (isClosed()) {
@@ -1306,7 +1306,7 @@ Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(con
13061306

13071307
pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId, promise));
13081308
lock.unlock();
1309-
sendCommand(Commands::newGetTopicsOfNamespace(nsName, requestId));
1309+
sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId));
13101310
return promise.getFuture();
13111311
}
13121312

lib/ClientConnection.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
181181

182182
Future<Result, GetLastMessageIdResponse> newGetLastMessageId(uint64_t consumerId, uint64_t requestId);
183183

184-
Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId);
184+
Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const std::string& nsName,
185+
CommandGetTopicsOfNamespace_Mode mode,
186+
uint64_t requestId);
185187

186188
Future<Result, boost::optional<SchemaInfo>> newGetSchema(const std::string& topicName,
187189
uint64_t requestId);

lib/ClientImpl.cc

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -338,29 +338,53 @@ void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const
338338
}
339339
}
340340

341-
NamespaceNamePtr nsName = topicNamePtr->getNamespaceName();
341+
if (TopicName::containsDomain(regexPattern)) {
342+
LOG_WARN("Ignore invalid domain: "
343+
<< topicNamePtr->getDomain()
344+
<< ", use the RegexSubscriptionMode parameter to set the topic type");
345+
}
346+
347+
CommandGetTopicsOfNamespace_Mode mode;
348+
auto regexSubscriptionMode = conf.getRegexSubscriptionMode();
349+
switch (regexSubscriptionMode) {
350+
case PersistentOnly:
351+
mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT;
352+
break;
353+
case NonPersistentOnly:
354+
mode = CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT;
355+
break;
356+
case AllTopics:
357+
mode = CommandGetTopicsOfNamespace_Mode_ALL;
358+
break;
359+
default:
360+
LOG_ERROR("RegexSubscriptionMode not valid: " << regexSubscriptionMode);
361+
callback(ResultInvalidConfiguration, Consumer());
362+
return;
363+
}
342364

343-
lookupServicePtr_->getTopicsOfNamespaceAsync(nsName).addListener(
344-
std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(), std::placeholders::_1,
345-
std::placeholders::_2, regexPattern, subscriptionName, conf, callback));
365+
lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode)
366+
.addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(),
367+
std::placeholders::_1, std::placeholders::_2, regexPattern, mode,
368+
subscriptionName, conf, callback));
346369
}
347370

348371
void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const NamespaceTopicsPtr topics,
349372
const std::string& regexPattern,
373+
CommandGetTopicsOfNamespace_Mode mode,
350374
const std::string& subscriptionName,
351375
const ConsumerConfiguration& conf,
352376
SubscribeCallback callback) {
353377
if (result == ResultOk) {
354378
ConsumerImplBasePtr consumer;
355379

356-
PULSAR_REGEX_NAMESPACE::regex pattern(regexPattern);
380+
PULSAR_REGEX_NAMESPACE::regex pattern(TopicName::removeDomain(regexPattern));
357381

358382
NamespaceTopicsPtr matchTopics =
359383
PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, pattern);
360384

361385
auto interceptors = std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
362386

363-
consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(), regexPattern,
387+
consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(), regexPattern, mode,
364388
*matchTopics, subscriptionName, conf,
365389
lookupServicePtr_, interceptors);
366390

lib/ClientImpl.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "Future.h"
2929
#include "LookupDataResult.h"
3030
#include "MemoryLimitController.h"
31+
#include "ProtoApiEnums.h"
3132
#include "ServiceNameResolver.h"
3233
#include "SynchronizedHashMap.h"
3334

@@ -151,8 +152,10 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
151152
void handleClose(Result result, SharedInt remaining, ResultCallback callback);
152153

153154
void createPatternMultiTopicsConsumer(const Result result, const NamespaceTopicsPtr topics,
154-
const std::string& regexPattern, const std::string& consumerName,
155-
const ConsumerConfiguration& conf, SubscribeCallback callback);
155+
const std::string& regexPattern,
156+
CommandGetTopicsOfNamespace_Mode mode,
157+
const std::string& consumerName, const ConsumerConfiguration& conf,
158+
SubscribeCallback callback);
156159

157160
enum State
158161
{

lib/Commands.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -609,12 +609,14 @@ SharedBuffer Commands::newGetLastMessageId(uint64_t consumerId, uint64_t request
609609
return buffer;
610610
}
611611

612-
SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId) {
612+
SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName,
613+
CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) {
613614
BaseCommand cmd;
614615
cmd.set_type(BaseCommand::GET_TOPICS_OF_NAMESPACE);
615616
CommandGetTopicsOfNamespace* getTopics = cmd.mutable_gettopicsofnamespace();
616617
getTopics->set_request_id(requestId);
617618
getTopics->set_namespace_(nsName);
619+
getTopics->set_mode(static_cast<proto::CommandGetTopicsOfNamespace_Mode>(mode));
618620

619621
const SharedBuffer buffer = writeMessageWithSize(cmd);
620622
cmd.clear_gettopicsofnamespace();

lib/Commands.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ class Commands {
156156
static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId, const MessageId& messageId);
157157
static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId, uint64_t timestamp);
158158
static SharedBuffer newGetLastMessageId(uint64_t consumerId, uint64_t requestId);
159-
static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId);
159+
static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName,
160+
CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId);
160161

161162
static bool peerSupportsGetLastMessageId(int32_t peerVersion);
162163
static bool peerSupportsActiveConsumerListener(int32_t peerVersion);

0 commit comments

Comments
 (0)