Skip to content

Commit

Permalink
[feat] PIP-188 Support blue-green migration (#402)
Browse files Browse the repository at this point in the history
### Motivation

Support blue-green migration pip-188 for cpp client

### Modifications

- added blue-green client logic 
- register the producer instance in the producers map before sending produce creation command. This is required since broker could send topic migration command in the middle of creating the producer.
  • Loading branch information
heesung-sn authored Feb 21, 2024
1 parent 7891ac5 commit 543e51c
Show file tree
Hide file tree
Showing 23 changed files with 487 additions and 127 deletions.
1 change: 0 additions & 1 deletion lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "ConnectionPool.h"
#include "LogUtils.h"
#include "NamespaceName.h"
#include "ServiceNameResolver.h"
#include "TopicName.h"

DECLARE_LOG_OBJECT()
Expand Down
8 changes: 5 additions & 3 deletions lib/BinaryProtoLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;

class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
public:
BinaryProtoLookupService(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool,
BinaryProtoLookupService(const std::string& serviceUrl, ConnectionPool& pool,
const ClientConfiguration& clientConfiguration)
: serviceNameResolver_(serviceNameResolver),
: serviceNameResolver_(serviceUrl),
cnxPool_(pool),
listenerName_(clientConfiguration.getListenerName()),
maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()) {}
Expand All @@ -54,6 +54,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {

Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const std::string& version) override;

ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; }

protected:
// Mark findBroker as protected to make it accessible from test.
LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic,
Expand All @@ -63,7 +65,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
std::mutex mutex_;
uint64_t requestIdGenerator_ = 0;

ServiceNameResolver& serviceNameResolver_;
ServiceNameResolver serviceNameResolver_;
ConnectionPool& cnxPool_;
std::string listenerName_;
const int32_t maxLookupRedirects_;
Expand Down
57 changes: 56 additions & 1 deletion lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::
LOG_INFO(cnxString_ << "Connected to broker");
} else {
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
<< ", proxy: " << proxyServiceUrl_);
<< ", proxy: " << proxyServiceUrl_
<< ", physical address:" << physicalAddress_);
}

Lock lock(mutex_);
Expand Down Expand Up @@ -945,6 +946,10 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
handleError(incomingCmd.error());
break;

case BaseCommand::TOPIC_MIGRATED:
handleTopicMigrated(incomingCmd.topicmigrated());
break;

case BaseCommand::CLOSE_PRODUCER:
handleCloseProducer(incomingCmd.close_producer());
break;
Expand Down Expand Up @@ -1761,6 +1766,56 @@ void ClientConnection::handleError(const proto::CommandError& error) {
}
}

std::string ClientConnection::getMigratedBrokerServiceUrl(
const proto::CommandTopicMigrated& commandTopicMigrated) {
if (tlsSocket_) {
if (commandTopicMigrated.has_brokerserviceurltls()) {
return commandTopicMigrated.brokerserviceurltls();
}
} else if (commandTopicMigrated.has_brokerserviceurl()) {
return commandTopicMigrated.brokerserviceurl();
}
return "";
}

void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& commandTopicMigrated) {
const long resourceId = commandTopicMigrated.resource_id();
const std::string migratedBrokerServiceUrl = getMigratedBrokerServiceUrl(commandTopicMigrated);

if (migratedBrokerServiceUrl.empty()) {
LOG_WARN("Failed to find the migrated broker url for resource:"
<< resourceId
<< (commandTopicMigrated.has_brokerserviceurl()
? ", migratedBrokerUrl: " + commandTopicMigrated.brokerserviceurl()
: "")
<< (commandTopicMigrated.has_brokerserviceurltls()
? ", migratedBrokerUrlTls: " + commandTopicMigrated.brokerserviceurltls()
: ""));
return;
}

Lock lock(mutex_);
if (commandTopicMigrated.resource_type() == proto::CommandTopicMigrated_ResourceType_Producer) {
auto it = producers_.find(resourceId);
if (it != producers_.end()) {
auto producer = it->second.lock();
producer->setRedirectedClusterURI(migratedBrokerServiceUrl);
LOG_INFO("Producer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
} else {
LOG_WARN("Got invalid producer Id in topicMigrated command: " << resourceId);
}
} else {
auto it = consumers_.find(resourceId);
if (it != consumers_.end()) {
auto consumer = it->second.lock();
consumer->setRedirectedClusterURI(migratedBrokerServiceUrl);
LOG_INFO("Consumer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl);
} else {
LOG_WARN("Got invalid consumer Id in topicMigrated command: " << resourceId);
}
}
}

boost::optional<std::string> ClientConnection::getAssignedBrokerServiceUrl(
const proto::CommandCloseProducer& closeProducer) {
if (tlsSocket_) {
Expand Down
9 changes: 5 additions & 4 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class BrokerEntryMetadata;
class CommandActiveConsumerChange;
class CommandAckResponse;
class CommandMessage;
class CommandTopicMigrated;
class CommandCloseConsumer;
class CommandCloseProducer;
class CommandConnected;
Expand Down Expand Up @@ -414,17 +415,17 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
void handleLookupTopicRespose(const proto::CommandLookupTopicResponse&);
void handleProducerSuccess(const proto::CommandProducerSuccess&);
void handleError(const proto::CommandError&);
void handleTopicMigrated(const proto::CommandTopicMigrated&);
void handleCloseProducer(const proto::CommandCloseProducer&);
void handleCloseConsumer(const proto::CommandCloseConsumer&);
void handleAuthChallenge();
void handleGetLastMessageIdResponse(const proto::CommandGetLastMessageIdResponse&);
void handleGetTopicOfNamespaceResponse(const proto::CommandGetTopicsOfNamespaceResponse&);
void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
void handleAckResponse(const proto::CommandAckResponse&);
boost::optional<std::string> getAssignedBrokerServiceUrl(
const proto::CommandCloseProducer& closeProducer);
boost::optional<std::string> getAssignedBrokerServiceUrl(
const proto::CommandCloseConsumer& closeConsumer);
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&);
boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&);
std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&);
};
} // namespace pulsar

Expand Down
56 changes: 40 additions & 16 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ typedef std::vector<std::string> StringList;
ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
: mutex_(),
state_(Open),
serviceNameResolver_(serviceUrl),
clientConfiguration_(ClientConfiguration(clientConfiguration).setUseTls(serviceNameResolver_.useTls())),
clientConfiguration_(ClientConfiguration(clientConfiguration)
.setUseTls(ServiceNameResolver::useTls(ServiceURI(serviceUrl)))),
memoryLimitController_(clientConfiguration.getMemoryLimit()),
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads())),
listenerExecutorProvider_(
Expand All @@ -98,25 +98,28 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
if (loggerFactory) {
LogUtils::setLoggerFactory(std::move(loggerFactory));
}
lookupServicePtr_ = createLookup(serviceUrl);
}

ClientImpl::~ClientImpl() { shutdown(); }

LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) {
LookupServicePtr underlyingLookupServicePtr;
if (serviceNameResolver_.useHttp()) {
if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
LOG_DEBUG("Using HTTP Lookup");
underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
std::ref(serviceNameResolver_), std::cref(clientConfiguration_),
std::cref(clientConfiguration_.getAuthPtr()));
serviceUrl, std::cref(clientConfiguration_), std::cref(clientConfiguration_.getAuthPtr()));
} else {
LOG_DEBUG("Using Binary Lookup");
underlyingLookupServicePtr = std::make_shared<BinaryProtoLookupService>(
std::ref(serviceNameResolver_), std::ref(pool_), std::cref(clientConfiguration_));
serviceUrl, std::ref(pool_), std::cref(clientConfiguration_));
}

lookupServicePtr_ = RetryableLookupService::create(
auto lookupServicePtr = RetryableLookupService::create(
underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
return lookupServicePtr;
}

ClientImpl::~ClientImpl() { shutdown(); }

const ClientConfiguration& ClientImpl::conf() const { return clientConfiguration_; }

MemoryLimitController& ClientImpl::getMemoryLimitController() { return memoryLimitController_; }
Expand All @@ -129,7 +132,21 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
return partitionListenerExecutorProvider_;
}

LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; }
LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) {
if (redirectedClusterURI.empty()) {
return lookupServicePtr_;
}

Lock lock(mutex_);
auto it = redirectedClusterLookupServicePtrs_.find(redirectedClusterURI);
if (it == redirectedClusterLookupServicePtrs_.end()) {
auto lookup = createLookup(redirectedClusterURI);
redirectedClusterLookupServicePtrs_.emplace(redirectedClusterURI, lookup);
return lookup;
}

return it->second;
}

void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf,
CreateProducerCallback callback, bool autoDownloadSchema) {
Expand Down Expand Up @@ -517,7 +534,8 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
}
}

GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t key) {
GetConnectionFuture ClientImpl::getConnection(const std::string& redirectedClusterURI,
const std::string& topic, size_t key) {
Promise<Result, ClientConnectionPtr> promise;

const auto topicNamePtr = TopicName::get(topic);
Expand All @@ -528,7 +546,8 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k
}

auto self = shared_from_this();
lookupServicePtr_->getBroker(*topicNamePtr)
getLookup(redirectedClusterURI)
->getBroker(*topicNamePtr)
.addListener([this, self, promise, key](Result result, const LookupService::LookupResult& data) {
if (result != ResultOk) {
promise.setFailed(result);
Expand All @@ -554,16 +573,18 @@ GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t k
return promise.getFuture();
}

const std::string& ClientImpl::getPhysicalAddress(const std::string& logicalAddress) {
const std::string& ClientImpl::getPhysicalAddress(const std::string& redirectedClusterURI,
const std::string& logicalAddress) {
if (useProxy_) {
return serviceNameResolver_.resolveHost();
return getLookup(redirectedClusterURI)->getServiceNameResolver().resolveHost();
} else {
return logicalAddress;
}
}

GetConnectionFuture ClientImpl::connect(const std::string& logicalAddress, size_t key) {
const auto& physicalAddress = getPhysicalAddress(logicalAddress);
GetConnectionFuture ClientImpl::connect(const std::string& redirectedClusterURI,
const std::string& logicalAddress, size_t key) {
const auto& physicalAddress = getPhysicalAddress(redirectedClusterURI, logicalAddress);
Promise<Result, ClientConnectionPtr> promise;
pool_.getConnectionAsync(logicalAddress, physicalAddress, key)
.addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
Expand Down Expand Up @@ -633,6 +654,9 @@ void ClientImpl::closeAsync(CloseCallback callback) {

memoryLimitController_.close();
lookupServicePtr_->close();
for (const auto& it : redirectedClusterLookupServicePtrs_) {
it.second->close();
}

auto producers = producers_.move();
auto consumers = consumers_.move();
Expand Down
15 changes: 10 additions & 5 deletions lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);

// Use virtual method to test
virtual GetConnectionFuture getConnection(const std::string& topic, size_t key);
virtual GetConnectionFuture getConnection(const std::string& redirectedClusterURI,
const std::string& topic, size_t key);

GetConnectionFuture connect(const std::string& logicalAddress, size_t key);
GetConnectionFuture connect(const std::string& redirectedClusterURI, const std::string& logicalAddress,
size_t key);

void closeAsync(CloseCallback callback);
void shutdown();
Expand All @@ -119,7 +121,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
ExecutorServiceProviderPtr getIOExecutorProvider();
ExecutorServiceProviderPtr getListenerExecutorProvider();
ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
LookupServicePtr getLookup();
LookupServicePtr getLookup(const std::string& redirectedClusterURI = "");

void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); }

Expand Down Expand Up @@ -165,7 +167,10 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
const std::string& consumerName, const ConsumerConfiguration& conf,
SubscribeCallback callback);

const std::string& getPhysicalAddress(const std::string& logicalAddress);
const std::string& getPhysicalAddress(const std::string& redirectedClusterURI,
const std::string& logicalAddress);

LookupServicePtr createLookup(const std::string& serviceUrl);

static std::string getClientVersion(const ClientConfiguration& clientConfiguration);

Expand All @@ -179,7 +184,6 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
std::mutex mutex_;

State state_;
ServiceNameResolver serviceNameResolver_;
ClientConfiguration clientConfiguration_;
MemoryLimitController memoryLimitController_;

Expand All @@ -188,6 +192,7 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
ExecutorServiceProviderPtr partitionListenerExecutorProvider_;

LookupServicePtr lookupServicePtr_;
std::unordered_map<std::string, LookupServicePtr> redirectedClusterLookupServicePtrs_;
ConnectionPool pool_;

uint64_t producerIdGenerator_;
Expand Down
4 changes: 2 additions & 2 deletions lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ const static std::string ADMIN_PATH_V2 = "/admin/v2/";
const static std::string PARTITION_METHOD_NAME = "partitions";
const static int NUMBER_OF_LOOKUP_THREADS = 1;

HTTPLookupService::HTTPLookupService(ServiceNameResolver &serviceNameResolver,
HTTPLookupService::HTTPLookupService(const std::string &serviceUrl,
const ClientConfiguration &clientConfiguration,
const AuthenticationPtr &authData)
: executorProvider_(std::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
serviceNameResolver_(serviceNameResolver),
serviceNameResolver_(serviceUrl),
authenticationPtr_(authData),
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()),
maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()),
Expand Down
6 changes: 4 additions & 2 deletions lib/HTTPLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
typedef Promise<Result, LookupDataResultPtr> LookupPromise;

ExecutorServiceProviderPtr executorProvider_;
ServiceNameResolver& serviceNameResolver_;
ServiceNameResolver serviceNameResolver_;
AuthenticationPtr authenticationPtr_;
int lookupTimeoutInSeconds_;
const int maxLookupRedirects_;
Expand All @@ -64,7 +64,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
Result sendHTTPRequest(std::string completeUrl, std::string& responseData, long& responseCode);

public:
HTTPLookupService(ServiceNameResolver&, const ClientConfiguration&, const AuthenticationPtr&);
HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&);

LookupResultFuture getBroker(const TopicName& topicName) override;

Expand All @@ -74,6 +74,8 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t

Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override;

ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; }
};
} // namespace pulsar

Expand Down
16 changes: 13 additions & 3 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
epoch_(0),
timer_(executor_->createDeadlineTimer()),
creationTimer_(executor_->createDeadlineTimer()),
reconnectionPending_(false) {}
reconnectionPending_(false),
redirectedClusterURI_("") {}

HandlerBase::~HandlerBase() {
ASIO_ERROR ignored;
Expand Down Expand Up @@ -88,9 +89,9 @@ void HandlerBase::grabCnx() { grabCnx(boost::none); }
Future<Result, ClientConnectionPtr> HandlerBase::getConnection(
const ClientImplPtr& client, const boost::optional<std::string>& assignedBrokerUrl) {
if (assignedBrokerUrl && client->getLookupCount() > 0) {
return client->connect(assignedBrokerUrl.get(), connectionKeySuffix_);
return client->connect(getRedirectedClusterURI(), assignedBrokerUrl.get(), connectionKeySuffix_);
} else {
return client->getConnection(topic(), connectionKeySuffix_);
return client->getConnection(getRedirectedClusterURI(), topic(), connectionKeySuffix_);
}
}

Expand Down Expand Up @@ -209,4 +210,13 @@ Result HandlerBase::convertToTimeoutIfNecessary(Result result, ptime startTimest
}
}

void HandlerBase::setRedirectedClusterURI(const std::string& serviceUrl) {
Lock lock(mutex_);
redirectedClusterURI_ = serviceUrl;
}
const std::string& HandlerBase::getRedirectedClusterURI() {
Lock lock(mutex_);
return redirectedClusterURI_;
}

} // namespace pulsar
Loading

0 comments on commit 543e51c

Please sign in to comment.