From fba2a94ee7e3909783d2644cb15f258b50a6b046 Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Wed, 3 Jun 2020 19:10:45 -0700 Subject: [PATCH] [PIP-60] [Proxy-Server] Support SNI routing to support various proxy-server in pulsar (#6566) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Motivation Implementation of [PIP-60](https://github.com/apache/pulsar/wiki/PIP-60:-Support-Proxy-server-with-SNI-routing) A proxy server is a go‑between or intermediary server that forwards requests from multiple clients to different servers across the Internet. The proxy server can act as a “traffic cop,” in both forward and reverse proxy scenarios, and adds various benefits in your system such as load balancing, performance, security, auto-scaling, etc.. There are already many proxy servers already available in the market which are fast, scalable and more importantly covers various essential security aspects that are needed by the large organization to securely share their confidential data over the network. Pulsar already provides proxy implementation PIP-1 which acts as a reverse proxy and creates a gateway in front of brokers. However, pulsar doesn’t provide support to use other proxies such as Apache traffic server (ATS), HAProxy, Nginx, Envoy those are more scalable and secured. Most of these proxy-servers support SNI ROUTING which can route traffic to a destination without having to terminate the SSL connection. Routing at layer 4 gives greater transparency because the outbound connection is determined by examining the destination address in the client TCP packets. [Netty supports sending SNI header on TLS handshake](https://github.com/netty/netty/issues/3801#issuecomment-104274440) and this PR uses that Netty feature to send SNI header while connecting to proxy. ### Modification https://github.com/apache/pulsar/wiki/PIP-60:-Support-Proxy-server-with-SNI-routing#changes **Note:** we have fully tested this changes with ATS proxy for both forward and reverse proxy scenarios. And I have also shared e2e example in PIP to use ATS proxy for client and broker integration. --- .../pulsar/broker/service/BrokerService.java | 7 +- .../pulsar/client/api/ProxyProtocolTest.java | 109 ++++++++++++++++++ .../pulsar/client/api/ClientBuilder.java | 10 ++ .../pulsar/client/api/ProxyProtocol.java | 31 +++++ .../client/api/PulsarClientException.java | 51 +++++++- .../apache/pulsar/admin/cli/CmdClusters.java | 10 +- .../pulsar/client/cli/PulsarClientTool.java | 14 +++ .../pulsar/client/impl/ClientBuilderImpl.java | 11 ++ .../pulsar/client/impl/ConnectionPool.java | 55 +++++++-- .../client/impl/PulsarChannelInitializer.java | 18 ++- .../impl/conf/ClientConfigurationData.java | 5 + .../common/policies/data/ClusterData.java | 47 +++++++- 12 files changed, 351 insertions(+), 17 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProxyProtocol.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 54facc4b703ff..0790e7399a7f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -85,6 +85,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; @@ -861,7 +862,11 @@ public PulsarClient getReplicationClient(String cluster) { clientBuilder.serviceUrl( isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl()); } - + if (data.getProxyProtocol() != null && StringUtils.isNotBlank(data.getProxyServiceUrl())) { + clientBuilder.proxyServiceUrl(data.getProxyServiceUrl(), data.getProxyProtocol()); + log.info("Configuring proxy-url {} with protocol {}", data.getProxyServiceUrl(), + data.getProxyProtocol()); + } // Share all the IO threads across broker and client connections ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData(); return new PulsarClientImpl(conf, workerGroup); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java new file mode 100644 index 0000000000000..964ebd93f4a42 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import lombok.Cleanup; + +public class ProxyProtocolTest extends TlsProducerConsumerBase { + private static final Logger log = LoggerFactory.getLogger(ProxyProtocolTest.class); + + @Test + public void testSniProxyProtocol() throws Exception { + + // Client should try to connect to proxy and pass broker-url as SNI header + String proxyUrl = pulsar.getBrokerServiceUrlTls(); + String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651"; + String topicName = "persistent://my-property/use/my-ns/my-topic1"; + + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl) + .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false) + .proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS); + Map authParams = new HashMap<>(); + authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); + authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); + clientBuilder.authentication(AuthenticationTls.class.getName(), authParams); + + @Cleanup + PulsarClient pulsarClient = clientBuilder.build(); + + // should be able to create producer successfully + pulsarClient.newProducer().topic(topicName).create(); + } + + @Test + public void testSniProxyProtocolWithInvalidProxyUrl() throws Exception { + + // Client should try to connect to proxy and pass broker-url as SNI header + String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651"; + String proxyHost = "invalid-url"; + String proxyUrl = "pulsar+ssl://" + proxyHost + ":5555"; + String topicName = "persistent://my-property/use/my-ns/my-topic1"; + + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl) + .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false) + .proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS); + Map authParams = new HashMap<>(); + authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); + authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); + clientBuilder.authentication(AuthenticationTls.class.getName(), authParams); + + @Cleanup + PulsarClient pulsarClient = clientBuilder.build(); + + try { + pulsarClient.newProducer().topic(topicName).create(); + fail("should have failed due to invalid url"); + } catch (PulsarClientException e) { + assertTrue(e.getMessage().contains(proxyHost)); + } + } + + @Test + public void testSniProxyProtocolWithoutTls() throws Exception { + // Client should try to connect to proxy and pass broker-url as SNI header + String proxyUrl = pulsar.getBrokerServiceUrl(); + String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651"; + String topicName = "persistent://my-property/use/my-ns/my-topic1"; + + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl) + .proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS); + + @Cleanup + PulsarClient pulsarClient = clientBuilder.build(); + + try { + pulsarClient.newProducer().topic(topicName).create(); + fail("should have failed due to non-tls url"); + } catch (PulsarClientException e) { + // Ok + } + } +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index b3aee5b457b55..865855c359b13 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -468,4 +468,14 @@ ClientBuilder authentication(String authPluginClassName, Map aut * @return the client builder instance */ ClientBuilder clock(Clock clock); + + /** + * Proxy-service url when client would like to connect to broker via proxy. Client can choose type of proxy-routing + * using {@link ProxyProtocol}. + * + * @param proxyServiceUrl proxy service url + * @param proxyProtocol protocol to decide type of proxy routing eg: SNI-routing + * @return + */ + ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProxyProtocol.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProxyProtocol.java new file mode 100644 index 0000000000000..470e4754aa4f1 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProxyProtocol.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * Protcol type to determine type of proxy routing when client connects to proxy using + * {@link ClientBuilder::proxyServiceUrl}. + */ +public enum ProxyProtocol { + /** + * Follows SNI-routing + * https://docs.trafficserver.apache.org/en/latest/admin-guide/layer-4-routing.en.html#sni-routing. + **/ + SNI +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index 16af0095c0515..b6e327c1bd209 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -68,12 +68,25 @@ public PulsarClientException(Throwable t) { /** * Constructs an {@code PulsarClientException} with the specified cause. * + * @param msg + * The detail message (which is saved for later retrieval by the {@link #getMessage()} method) + * * @param t - * The cause (which is saved for later retrieval by the - * {@link #getCause()} method). (A null value is permitted, - * and indicates that the cause is nonexistent or unknown.) + * The cause (which is saved for later retrieval by the {@link #getCause()} method). (A null value is + * permitted, and indicates that the cause is nonexistent or unknown.) + */ + public PulsarClientException(String msg, Throwable t) { + super(msg, t); + } + + /** + * Constructs an {@code PulsarClientException} with the specified cause. + * + * @param t + * The cause (which is saved for later retrieval by the {@link #getCause()} method). (A null value is + * permitted, and indicates that the cause is nonexistent or unknown.) * @param sequenceId - * The sequenceId of the message + * The sequenceId of the message */ public PulsarClientException(Throwable t, long sequenceId) { super(t); @@ -95,6 +108,21 @@ public static class InvalidServiceURL extends PulsarClientException { public InvalidServiceURL(Throwable t) { super(t); } + + /** + * Constructs an {@code InvalidServiceURL} with the specified cause. + * + *@param msg + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + * @param t + * The cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A null value is permitted, + * and indicates that the cause is nonexistent or unknown.) + */ + public InvalidServiceURL(String msg, Throwable t) { + super(msg, t); + } } /** @@ -123,6 +151,21 @@ public InvalidConfigurationException(String msg) { public InvalidConfigurationException(Throwable t) { super(t); } + + /** + * Constructs an {@code InvalidConfigurationException} with the specified cause. + * + *@param msg + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + * @param t + * The cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A null value is permitted, + * and indicates that the cause is nonexistent or unknown.) + */ + public InvalidConfigurationException(String msg, Throwable t) { + super(msg, t); + } } /** diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java index 88f3b3f04b120..75f80ee4235ff 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java @@ -24,6 +24,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.FailureDomain; @@ -69,10 +70,17 @@ private class Create extends CliCommand { @Parameter(names = "--broker-url-secure", description = "broker-service-url for secure connection", required = false) private String brokerServiceUrlTls; + @Parameter(names = "--proxy-url", description = "Proxy-service url when client would like to connect to broker via proxy.", required = false) + private String proxyServiceUrl; + + @Parameter(names = "--proxy-protocol", description = "protocol to decide type of proxy routing eg: SNI", required = false) + private ProxyProtocol proxyProtocol; + void run() throws PulsarAdminException { String cluster = getOneArgument(params); admin.clusters().createCluster(cluster, - new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls)); + new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls, proxyServiceUrl, + proxyProtocol)); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java index b86bc79f5c1dc..0ba722771655d 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java @@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; @@ -43,6 +44,12 @@ public class PulsarClientTool { @Parameter(names = { "--url" }, description = "Broker URL to which to connect.") String serviceURL = null; + @Parameter(names = { "--proxy-url" }, description = "Proxy-server URL to which to connect.") + String proxyServiceURL = null; + + @Parameter(names = { "--proxy-protocol" }, description = "Proxy protocol to select type of routing at proxy.") + ProxyProtocol proxyProtocol = null; + @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name.") String authPluginClassName = null; @@ -117,6 +124,13 @@ private void updateConfig() throws UnsupportedAuthenticationException { .tlsTrustStorePath(tlsTrustStorePath) .tlsTrustStorePassword(tlsTrustStorePassword); + if (StringUtils.isNotBlank(proxyServiceURL)) { + if (proxyProtocol == null) { + System.out.println("proxy-protocol must be provided with proxy-url"); + System.exit(-1); + } + clientBuilder.proxyServiceUrl(proxyServiceURL, proxyProtocol); + } this.produceCommand.updateConfig(clientBuilder, authentication, this.serviceURL); this.consumeCommand.updateConfig(clientBuilder, authentication, this.serviceURL); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index 2c920e25250b5..6b820b990b9e2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; @@ -294,4 +295,14 @@ public ClientBuilder clock(Clock clock) { conf.setClock(clock); return this; } + + @Override + public ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol) { + if (StringUtils.isNotBlank(proxyServiceUrl) && proxyProtocol == null) { + throw new IllegalArgumentException("proxyProtocol must be present with proxyServiceUrl"); + } + conf.setProxyServiceUrl(proxyServiceUrl); + conf.setProxyProtocol(proxyProtocol); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 30301bbd8f398..fff9858be64b5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -34,6 +35,9 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; import java.util.Iterator; import java.util.List; import java.util.Random; @@ -43,9 +47,12 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +61,8 @@ public class ConnectionPool implements Closeable { protected final ConcurrentHashMap>> pool; private final Bootstrap bootstrap; + private final PulsarChannelInitializer channelInitializerHandler; + private final ClientConfigurationData clientConfig; private final EventLoopGroup eventLoopGroup; private final int maxConnectionsPerHosts; @@ -66,6 +75,7 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, Supplier clientCnxSupplier) throws PulsarClientException { this.eventLoopGroup = eventLoopGroup; + this.clientConfig = conf; this.maxConnectionsPerHosts = conf.getConnectionsPerBroker(); pool = new ConcurrentHashMap<>(); @@ -78,7 +88,8 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); try { - bootstrap.handler(new PulsarChannelInitializer(conf, clientCnxSupplier)); + channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier); + bootstrap.handler(channelInitializerHandler); } catch (Exception e) { log.error("Failed to create channel initializer"); throw new PulsarClientException(e); @@ -214,10 +225,17 @@ private CompletableFuture createConnection(InetSocketAddress logicalA private CompletableFuture createConnection(InetSocketAddress unresolvedAddress) { String hostname = unresolvedAddress.getHostString(); int port = unresolvedAddress.getPort(); - - // Resolve DNS --> Attempt to connect to all IP addresses until once succeeds - return resolveName(hostname) - .thenCompose(inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port)); + try { + // For non-sni-proxy: Resolve DNS --> Attempt to connect to all IP addresses until once succeeds + CompletableFuture> resolvedAddress = isSniProxy() + ? CompletableFuture.completedFuture(Lists.newArrayList(InetAddress.getByName(hostname))) + : resolveName(hostname); + return resolvedAddress + .thenCompose(inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port)); + } catch (UnknownHostException e) { + log.error("Invalid remote url {}", hostname, e); + return FutureUtil.failedFuture(new InvalidServiceURL("Invalid url " + hostname, e)); + } } /** @@ -227,7 +245,7 @@ private CompletableFuture createConnection(InetSocketAddress unresolved private CompletableFuture connectToResolvedAddresses(Iterator unresolvedAddresses, int port) { CompletableFuture future = new CompletableFuture<>(); - connectToAddress(unresolvedAddresses.next(), port).thenAccept(channel -> { + connectToAddress(unresolvedAddresses.next(), port, false).thenAccept(channel -> { // Successfully connected to server future.complete(channel); }).exceptionally(exception -> { @@ -266,9 +284,27 @@ CompletableFuture> resolveName(String hostname) { /** * Attempt to establish a TCP connection to an already resolved single IP address */ - private CompletableFuture connectToAddress(InetAddress ipAddress, int port) { + private CompletableFuture connectToAddress(InetAddress ipAddress, int port, boolean ignoreProxyUrl) { CompletableFuture future = new CompletableFuture<>(); + if (!ignoreProxyUrl && isSniProxy()) { + // client wants to connect to proxy and wants to pass + // target connection host in sni header + channelInitializerHandler.setSniHostName(ipAddress.getHostName()); + channelInitializerHandler.setSniHostPort(port); + // connect to proxy host + try { + URI proxyURI = new URI(clientConfig.getProxyServiceUrl()); + // resolve proxy host-address and try to connect again by passing flag ignoreProxyUrl because proxy-host + // will be already resolved + return resolveName(proxyURI.getHost()) + .thenCompose(inetAddresses -> connectToAddress(inetAddresses.iterator().next(), proxyURI.getPort(), true)); + } catch (URISyntaxException e) { + log.error("Failed to parse proxy-service url {}", clientConfig.getProxyServiceUrl(), e); + future.completeExceptionally(e); + return future; + } + } bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> { if (channelFuture.isSuccess()) { future.complete(channelFuture.channel()); @@ -307,5 +343,10 @@ public static int signSafeMod(long dividend, int divisor) { return mod; } + private boolean isSniProxy() { + return channelInitializerHandler.isTlsEnabled() && clientConfig.getProxyProtocol() != null + && StringUtils.isNotBlank(clientConfig.getProxyServiceUrl()); + } + private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index 4a145e7410126..e418904a19a21 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -27,6 +27,10 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; +import lombok.Getter; +import lombok.Setter; + +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.ObjectCache; @@ -41,11 +45,16 @@ public class PulsarChannelInitializer extends ChannelInitializer public static final String TLS_HANDLER = "tls"; private final Supplier clientCnxSupplier; + @Getter private final boolean tlsEnabled; private final boolean tlsEnabledWithKeyStore; private final Supplier sslContextSupplier; private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder; + @Setter + private String sniHostName; + @Setter + private int sniHostPort; private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(1); @@ -99,9 +108,12 @@ public void initChannel(SocketChannel ch) throws Exception { if (tlsEnabledWithKeyStore) { ch.pipeline().addLast(TLS_HANDLER, new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine())); - } else { - ch.pipeline().addLast(TLS_HANDLER, sslContextSupplier.get().newHandler(ch.alloc())); - } + } else { + SslHandler handler = StringUtils.isNotBlank(sniHostName) + ? sslContextSupplier.get().newHandler(ch.alloc(), sniHostName, sniHostPort) + : sslContextSupplier.get().newHandler(ch.alloc()); + ch.pipeline().addLast(TLS_HANDLER, handler); + } ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); } else { ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index ac08553b48f58..7bedc01e26ac5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -26,6 +26,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.client.api.ServiceUrlProvider; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; @@ -87,6 +88,10 @@ public class ClientConfigurationData implements Serializable, Cloneable { private Set tlsCiphers = Sets.newTreeSet(); private Set tlsProtocols = Sets.newTreeSet(); + /** proxyServiceUrl and proxyProtocol must be mutually inclusive **/ + private String proxyServiceUrl; + private ProxyProtocol proxyProtocol; + @JsonIgnore private Clock clock = Clock.systemDefaultZone(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java index e5b2859cd1468..35cecbbfd90dd 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java @@ -26,6 +26,8 @@ import java.util.LinkedHashSet; import java.util.Objects; +import org.apache.pulsar.client.api.ProxyProtocol; + /** * The configuration data for a cluster. */ @@ -58,6 +60,20 @@ public class ClusterData { example = "pulsar+ssl://pulsar.example.com:6651" ) private String brokerServiceUrlTls; + @ApiModelProperty( + name = "proxyServiceUrl", + value = "Proxy-service url when client would like to connect to broker via proxy.", + example = "pulsar+ssl://ats-proxy.example.com:4443 or " + + "pulsar://ats-proxy.example.com:4080" + ) + private String proxyServiceUrl; + @ApiModelProperty( + name = "proxyProtocol", + value = "protocol to decide type of proxy routing eg: SNI-routing", + example = "SNI" + ) + private ProxyProtocol proxyProtocol; + // For given Cluster1(us-west1, us-east1) and Cluster2(us-west2, us-east2) // Peer: [us-west1 -> us-west2] and [us-east1 -> us-east2] @ApiModelProperty( @@ -85,6 +101,16 @@ public ClusterData(String serviceUrl, String serviceUrlTls, String brokerService this.brokerServiceUrlTls = brokerServiceUrlTls; } + public ClusterData(String serviceUrl, String serviceUrlTls, String brokerServiceUrl, String brokerServiceUrlTls, + String proxyServiceUrl, ProxyProtocol proxyProtocol) { + this.serviceUrl = serviceUrl; + this.serviceUrlTls = serviceUrlTls; + this.brokerServiceUrl = brokerServiceUrl; + this.brokerServiceUrlTls = brokerServiceUrlTls; + this.proxyServiceUrl = proxyServiceUrl; + this.proxyProtocol = proxyProtocol; + } + public void update(ClusterData other) { checkNotNull(other); this.serviceUrl = other.serviceUrl; @@ -125,6 +151,22 @@ public void setBrokerServiceUrlTls(String brokerServiceUrlTls) { this.brokerServiceUrlTls = brokerServiceUrlTls; } + public String getProxyServiceUrl() { + return proxyServiceUrl; + } + + public void setProxyServiceUrl(String proxyServiceUrl) { + this.proxyServiceUrl = proxyServiceUrl; + } + + public ProxyProtocol getProxyProtocol() { + return proxyProtocol; + } + + public void setProxyProtocol(ProxyProtocol proxyProtocol) { + this.proxyProtocol = proxyProtocol; + } + public LinkedHashSet getPeerClusterNames() { return peerClusterNames; } @@ -139,7 +181,9 @@ public boolean equals(Object obj) { ClusterData other = (ClusterData) obj; return Objects.equals(serviceUrl, other.serviceUrl) && Objects.equals(serviceUrlTls, other.serviceUrlTls) && Objects.equals(brokerServiceUrl, other.brokerServiceUrl) - && Objects.equals(brokerServiceUrlTls, other.brokerServiceUrlTls); + && Objects.equals(brokerServiceUrlTls, other.brokerServiceUrlTls) + && Objects.equals(proxyServiceUrl, other.proxyServiceUrl) + && Objects.equals(proxyProtocol, other.proxyProtocol); } return false; @@ -154,6 +198,7 @@ public int hashCode() { public String toString() { return MoreObjects.toStringHelper(this).add("serviceUrl", serviceUrl).add("serviceUrlTls", serviceUrlTls) .add("brokerServiceUrl", brokerServiceUrl).add("brokerServiceUrlTls", brokerServiceUrlTls) + .add("proxyServiceUrl", proxyServiceUrl).add("proxyProtocol", proxyProtocol) .add("peerClusterNames", peerClusterNames).toString(); }