Skip to content

Commit 25ea451

Browse files
[PIP-60] [Proxy-Client] Support SNI routing for Pulsar CPP client (#373)
* [PIP-60] [Proxy-Server] Support SNI routing for Pulsar CPP client * fix format * fix const def * Fix format check --------- Co-authored-by: Yunze Xu <xyzinfernity@163.com>
1 parent e04686d commit 25ea451

File tree

6 files changed

+82
-4
lines changed

6 files changed

+82
-4
lines changed

include/pulsar/ClientConfiguration.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ class PULSAR_PUBLIC ClientConfiguration {
3232
~ClientConfiguration();
3333
ClientConfiguration(const ClientConfiguration&);
3434
ClientConfiguration& operator=(const ClientConfiguration&);
35+
enum ProxyProtocol
36+
{
37+
SNI = 0
38+
};
3539

3640
/**
3741
* Configure a limit on the amount of memory that will be allocated by this client instance.
@@ -320,6 +324,33 @@ class PULSAR_PUBLIC ClientConfiguration {
320324
*/
321325
ClientConfiguration& setConnectionTimeout(int timeoutMs);
322326

327+
/**
328+
* Set proxy-service url when client would like to connect to broker via proxy. Client must configure both
329+
* proxyServiceUrl and appropriate proxyProtocol.
330+
*
331+
* Example: pulsar+ssl://ats-proxy.example.com:4443
332+
*
333+
* @param proxyServiceUrl proxy url to connect with broker
334+
* @return
335+
*/
336+
ClientConfiguration& setProxyServiceUrl(const std::string& proxyServiceUrl);
337+
338+
const std::string& getProxyServiceUrl() const;
339+
340+
/**
341+
* Set appropriate proxy-protocol along with proxy-service url. Currently Pulsar supports SNI proxy
342+
* routing.
343+
*
344+
* SNI routing:
345+
* https://docs.trafficserver.apache.org/en/latest/admin-guide/layer-4-routing.en.html#sni-routing.
346+
*
347+
* @param proxyProtocol possible options (SNI)
348+
* @return
349+
*/
350+
ClientConfiguration& setProxyProtocol(ProxyProtocol proxyProtocol);
351+
352+
ProxyProtocol getProxyProtocol() const;
353+
323354
/**
324355
* The getter associated with setConnectionTimeout().
325356
*/

lib/ClientConfiguration.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,22 @@ ClientConfiguration& ClientConfiguration::setConcurrentLookupRequest(int concurr
134134
return *this;
135135
}
136136

137+
ClientConfiguration& ClientConfiguration::setProxyServiceUrl(const std::string& proxyServiceUrl) {
138+
impl_->proxyServiceUrl = proxyServiceUrl;
139+
return *this;
140+
}
141+
142+
const std::string& ClientConfiguration::getProxyServiceUrl() const { return impl_->proxyServiceUrl; }
143+
144+
ClientConfiguration& ClientConfiguration::setProxyProtocol(ClientConfiguration::ProxyProtocol proxyProtocol) {
145+
impl_->proxyProtocol = proxyProtocol;
146+
return *this;
147+
}
148+
149+
ClientConfiguration::ProxyProtocol ClientConfiguration::getProxyProtocol() const {
150+
return impl_->proxyProtocol;
151+
}
152+
137153
int ClientConfiguration::getConcurrentLookupRequest() const { return impl_->concurrentLookupRequest; }
138154

139155
ClientConfiguration& ClientConfiguration::setMaxLookupRedirects(int maxLookupRedirects) {

lib/ClientConfigurationImpl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ struct ClientConfigurationImpl {
4646
std::string listenerName;
4747
int connectionTimeoutMs{10000}; // 10 seconds
4848
std::string description;
49+
std::string proxyServiceUrl;
50+
ClientConfiguration::ProxyProtocol proxyProtocol;
4951

5052
std::unique_ptr<LoggerFactory> takeLogger() { return std::move(loggerFactory); }
5153
};

lib/ClientConnection.cc

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,15 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
209209
boost::asio::ssl::context ctx(executor_->getIOService(), boost::asio::ssl::context::tlsv1_client);
210210
#endif
211211
Url serviceUrl;
212+
Url proxyUrl;
212213
Url::parse(physicalAddress, serviceUrl);
214+
proxyServiceUrl_ = clientConfiguration.getProxyServiceUrl();
215+
proxyProtocol_ = clientConfiguration.getProxyProtocol();
216+
if (proxyProtocol_ == ClientConfiguration::SNI && !proxyServiceUrl_.empty()) {
217+
Url::parse(proxyServiceUrl_, proxyUrl);
218+
isSniProxy_ = true;
219+
LOG_INFO("Configuring SNI Proxy-url=" << proxyServiceUrl_);
220+
}
213221
if (clientConfiguration.isTlsAllowInsecureConnection()) {
214222
ctx.set_verify_mode(boost::asio::ssl::context::verify_none);
215223
isTlsAllowInsecureConnection_ = true;
@@ -257,7 +265,8 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
257265

258266
if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) {
259267
LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port());
260-
tlsSocket_->set_verify_callback(boost::asio::ssl::rfc2818_verification(serviceUrl.host()));
268+
std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host();
269+
tlsSocket_->set_verify_callback(boost::asio::ssl::rfc2818_verification(urlHost));
261270
}
262271

263272
LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
@@ -403,7 +412,8 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
403412
if (logicalAddress_ == physicalAddress_) {
404413
LOG_INFO(cnxString_ << "Connected to broker");
405414
} else {
406-
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_);
415+
LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
416+
<< ", proxy: " << proxyServiceUrl_);
407417
}
408418

409419
Lock lock(mutex_);
@@ -572,7 +582,8 @@ void ClientConnection::tcpConnectAsync() {
572582

573583
boost::system::error_code err;
574584
Url service_url;
575-
if (!Url::parse(physicalAddress_, service_url)) {
585+
std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
586+
if (!Url::parse(hostUrl, service_url)) {
576587
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
577588
close();
578589
return;
@@ -600,7 +611,8 @@ void ClientConnection::tcpConnectAsync() {
600611
void ClientConnection::handleResolve(const boost::system::error_code& err,
601612
tcp::resolver::iterator endpointIterator) {
602613
if (err) {
603-
LOG_ERROR(cnxString_ << "Resolve error: " << err << " : " << err.message());
614+
std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
615+
LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
604616
close();
605617
return;
606618
}

lib/ClientConnection.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,10 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
333333
*/
334334
const std::string physicalAddress_;
335335

336+
std::string proxyServiceUrl_;
337+
338+
ClientConfiguration::ProxyProtocol proxyProtocol_;
339+
336340
// Represent both endpoint of the tcp connection. eg: [client:1234 -> server:6650]
337341
std::string cnxString_;
338342

@@ -384,6 +388,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
384388

385389
// Signals whether we're waiting for a response from broker
386390
bool havePendingPingRequest_ = false;
391+
bool isSniProxy_ = false;
387392
DeadlineTimerPtr keepAliveTimer_;
388393
DeadlineTimerPtr consumerStatsRequestTimer_;
389394

tests/ConsumerTest.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1478,4 +1478,16 @@ TEST(ConsumerTest, testConsumerName) {
14781478
client.close();
14791479
}
14801480

1481+
TEST(ConsumerTest, testSNIProxyConnect) {
1482+
ClientConfiguration clientConfiguration;
1483+
clientConfiguration.setProxyServiceUrl(lookupUrl);
1484+
clientConfiguration.setProxyProtocol(ClientConfiguration::SNI);
1485+
1486+
Client client(lookupUrl, clientConfiguration);
1487+
const std::string topic = "testSNIProxy-" + std::to_string(time(nullptr));
1488+
1489+
Consumer consumer;
1490+
ASSERT_EQ(ResultOk, client.subscribe(topic, "test-sub", consumer));
1491+
client.close();
1492+
}
14811493
} // namespace pulsar

0 commit comments

Comments
 (0)