From 2a6628a50706092368f314a7e97f781a2cdc4ab0 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 18 Mar 2020 13:25:51 -0700 Subject: [PATCH] [Proxy-Server] Support SNI routing to support various proxy-server in pulsar addressed comments fix tls handler --- .../pulsar/broker/service/BrokerService.java | 7 +- .../pulsar/client/api/ProxyProtocolTest.java | 109 ++++++++++++++++++ .../pulsar/client/api/ClientBuilder.java | 10 ++ .../pulsar/client/api/ProxyProtocol.java | 31 +++++ .../client/api/PulsarClientException.java | 51 +++++++- .../apache/pulsar/admin/cli/CmdClusters.java | 10 +- .../pulsar/client/cli/PulsarClientTool.java | 14 +++ .../pulsar/client/impl/ClientBuilderImpl.java | 11 ++ .../pulsar/client/impl/ConnectionPool.java | 55 +++++++-- .../client/impl/PulsarChannelInitializer.java | 18 ++- .../impl/conf/ClientConfigurationData.java | 5 + .../common/policies/data/ClusterData.java | 47 +++++++- 12 files changed, 351 insertions(+), 17 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProxyProtocol.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 491ef7fae9345a..205928effde6e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -85,6 +85,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; @@ -861,7 +862,11 @@ public PulsarClient getReplicationClient(String cluster) { clientBuilder.serviceUrl( isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl()); } - + if (data.getProxyProtocol() != null && StringUtils.isNotBlank(data.getProxyServiceUrl())) { + clientBuilder.proxyServiceUrl(data.getProxyServiceUrl(), data.getProxyProtocol()); + log.info("Configuring proxy-url {} with protocol {}", data.getProxyServiceUrl(), + data.getProxyProtocol()); + } // Share all the IO threads across broker and client connections ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData(); return new PulsarClientImpl(conf, workerGroup); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java new file mode 100644 index 00000000000000..964ebd93f4a42e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import lombok.Cleanup; + +public class ProxyProtocolTest extends TlsProducerConsumerBase { + private static final Logger log = LoggerFactory.getLogger(ProxyProtocolTest.class); + + @Test + public void testSniProxyProtocol() throws Exception { + + // Client should try to connect to proxy and pass broker-url as SNI header + String proxyUrl = pulsar.getBrokerServiceUrlTls(); + String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651"; + String topicName = "persistent://my-property/use/my-ns/my-topic1"; + + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl) + .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false) + .proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS); + Map authParams = new HashMap<>(); + authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); + authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); + clientBuilder.authentication(AuthenticationTls.class.getName(), authParams); + + @Cleanup + PulsarClient pulsarClient = clientBuilder.build(); + + // should be able to create producer successfully + pulsarClient.newProducer().topic(topicName).create(); + } + + @Test + public void testSniProxyProtocolWithInvalidProxyUrl() throws Exception { + + // Client should try to connect to proxy and pass broker-url as SNI header + String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651"; + String proxyHost = "invalid-url"; + String proxyUrl = "pulsar+ssl://" + proxyHost + ":5555"; + String topicName = "persistent://my-property/use/my-ns/my-topic1"; + + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl) + .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false) + .proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS); + Map authParams = new HashMap<>(); + authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); + authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); + clientBuilder.authentication(AuthenticationTls.class.getName(), authParams); + + @Cleanup + PulsarClient pulsarClient = clientBuilder.build(); + + try { + pulsarClient.newProducer().topic(topicName).create(); + fail("should have failed due to invalid url"); + } catch (PulsarClientException e) { + assertTrue(e.getMessage().contains(proxyHost)); + } + } + + @Test + public void testSniProxyProtocolWithoutTls() throws Exception { + // Client should try to connect to proxy and pass broker-url as SNI header + String proxyUrl = pulsar.getBrokerServiceUrl(); + String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651"; + String topicName = "persistent://my-property/use/my-ns/my-topic1"; + + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl) + .proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS); + + @Cleanup + PulsarClient pulsarClient = clientBuilder.build(); + + try { + pulsarClient.newProducer().topic(topicName).create(); + fail("should have failed due to non-tls url"); + } catch (PulsarClientException e) { + // Ok + } + } +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index c12e9a1f41f129..c890f32cbc1877 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -460,4 +460,14 @@ ClientBuilder authentication(String authPluginClassName, Map aut * @return the client builder instance */ ClientBuilder clock(Clock clock); + + /** + * Proxy-service url when client would like to connect to broker via proxy. Client can choose type of proxy-routing + * using {@link ProxyProtocol}. + * + * @param proxyServiceUrl proxy service url + * @param proxyProtocol protocol to decide type of proxy routing eg: SNI-routing + * @return + */ + ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProxyProtocol.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProxyProtocol.java new file mode 100644 index 00000000000000..470e4754aa4f1d --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProxyProtocol.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * Protcol type to determine type of proxy routing when client connects to proxy using + * {@link ClientBuilder::proxyServiceUrl}. + */ +public enum ProxyProtocol { + /** + * Follows SNI-routing + * https://docs.trafficserver.apache.org/en/latest/admin-guide/layer-4-routing.en.html#sni-routing. + **/ + SNI +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index 16af0095c0515a..b6e327c1bd2095 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -68,12 +68,25 @@ public PulsarClientException(Throwable t) { /** * Constructs an {@code PulsarClientException} with the specified cause. * + * @param msg + * The detail message (which is saved for later retrieval by the {@link #getMessage()} method) + * * @param t - * The cause (which is saved for later retrieval by the - * {@link #getCause()} method). (A null value is permitted, - * and indicates that the cause is nonexistent or unknown.) + * The cause (which is saved for later retrieval by the {@link #getCause()} method). (A null value is + * permitted, and indicates that the cause is nonexistent or unknown.) + */ + public PulsarClientException(String msg, Throwable t) { + super(msg, t); + } + + /** + * Constructs an {@code PulsarClientException} with the specified cause. + * + * @param t + * The cause (which is saved for later retrieval by the {@link #getCause()} method). (A null value is + * permitted, and indicates that the cause is nonexistent or unknown.) * @param sequenceId - * The sequenceId of the message + * The sequenceId of the message */ public PulsarClientException(Throwable t, long sequenceId) { super(t); @@ -95,6 +108,21 @@ public static class InvalidServiceURL extends PulsarClientException { public InvalidServiceURL(Throwable t) { super(t); } + + /** + * Constructs an {@code InvalidServiceURL} with the specified cause. + * + *@param msg + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + * @param t + * The cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A null value is permitted, + * and indicates that the cause is nonexistent or unknown.) + */ + public InvalidServiceURL(String msg, Throwable t) { + super(msg, t); + } } /** @@ -123,6 +151,21 @@ public InvalidConfigurationException(String msg) { public InvalidConfigurationException(Throwable t) { super(t); } + + /** + * Constructs an {@code InvalidConfigurationException} with the specified cause. + * + *@param msg + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + * @param t + * The cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A null value is permitted, + * and indicates that the cause is nonexistent or unknown.) + */ + public InvalidConfigurationException(String msg, Throwable t) { + super(msg, t); + } } /** diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java index 88f3b3f04b120d..75f80ee4235ff9 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java @@ -24,6 +24,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.FailureDomain; @@ -69,10 +70,17 @@ private class Create extends CliCommand { @Parameter(names = "--broker-url-secure", description = "broker-service-url for secure connection", required = false) private String brokerServiceUrlTls; + @Parameter(names = "--proxy-url", description = "Proxy-service url when client would like to connect to broker via proxy.", required = false) + private String proxyServiceUrl; + + @Parameter(names = "--proxy-protocol", description = "protocol to decide type of proxy routing eg: SNI", required = false) + private ProxyProtocol proxyProtocol; + void run() throws PulsarAdminException { String cluster = getOneArgument(params); admin.clusters().createCluster(cluster, - new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls)); + new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls, proxyServiceUrl, + proxyProtocol)); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java index b86bc79f5c1dc0..0ba722771655d8 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java @@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; @@ -43,6 +44,12 @@ public class PulsarClientTool { @Parameter(names = { "--url" }, description = "Broker URL to which to connect.") String serviceURL = null; + @Parameter(names = { "--proxy-url" }, description = "Proxy-server URL to which to connect.") + String proxyServiceURL = null; + + @Parameter(names = { "--proxy-protocol" }, description = "Proxy protocol to select type of routing at proxy.") + ProxyProtocol proxyProtocol = null; + @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name.") String authPluginClassName = null; @@ -117,6 +124,13 @@ private void updateConfig() throws UnsupportedAuthenticationException { .tlsTrustStorePath(tlsTrustStorePath) .tlsTrustStorePassword(tlsTrustStorePassword); + if (StringUtils.isNotBlank(proxyServiceURL)) { + if (proxyProtocol == null) { + System.out.println("proxy-protocol must be provided with proxy-url"); + System.exit(-1); + } + clientBuilder.proxyServiceUrl(proxyServiceURL, proxyProtocol); + } this.produceCommand.updateConfig(clientBuilder, authentication, this.serviceURL); this.consumeCommand.updateConfig(clientBuilder, authentication, this.serviceURL); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index 7a56e42a3fff42..ded6fb38a96c4c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; @@ -285,4 +286,14 @@ public ClientBuilder clock(Clock clock) { conf.setClock(clock); return this; } + + @Override + public ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol) { + if (StringUtils.isNotBlank(proxyServiceUrl) && proxyProtocol == null) { + throw new IllegalArgumentException("proxyProtocol must be present with proxyServiceUrl"); + } + conf.setProxyServiceUrl(proxyServiceUrl); + conf.setProxyProtocol(proxyProtocol); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 30301bbd8f3982..fff9858be64b59 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -34,6 +35,9 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; import java.util.Iterator; import java.util.List; import java.util.Random; @@ -43,9 +47,12 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +61,8 @@ public class ConnectionPool implements Closeable { protected final ConcurrentHashMap>> pool; private final Bootstrap bootstrap; + private final PulsarChannelInitializer channelInitializerHandler; + private final ClientConfigurationData clientConfig; private final EventLoopGroup eventLoopGroup; private final int maxConnectionsPerHosts; @@ -66,6 +75,7 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, Supplier clientCnxSupplier) throws PulsarClientException { this.eventLoopGroup = eventLoopGroup; + this.clientConfig = conf; this.maxConnectionsPerHosts = conf.getConnectionsPerBroker(); pool = new ConcurrentHashMap<>(); @@ -78,7 +88,8 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); try { - bootstrap.handler(new PulsarChannelInitializer(conf, clientCnxSupplier)); + channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier); + bootstrap.handler(channelInitializerHandler); } catch (Exception e) { log.error("Failed to create channel initializer"); throw new PulsarClientException(e); @@ -214,10 +225,17 @@ private CompletableFuture createConnection(InetSocketAddress logicalA private CompletableFuture createConnection(InetSocketAddress unresolvedAddress) { String hostname = unresolvedAddress.getHostString(); int port = unresolvedAddress.getPort(); - - // Resolve DNS --> Attempt to connect to all IP addresses until once succeeds - return resolveName(hostname) - .thenCompose(inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port)); + try { + // For non-sni-proxy: Resolve DNS --> Attempt to connect to all IP addresses until once succeeds + CompletableFuture> resolvedAddress = isSniProxy() + ? CompletableFuture.completedFuture(Lists.newArrayList(InetAddress.getByName(hostname))) + : resolveName(hostname); + return resolvedAddress + .thenCompose(inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port)); + } catch (UnknownHostException e) { + log.error("Invalid remote url {}", hostname, e); + return FutureUtil.failedFuture(new InvalidServiceURL("Invalid url " + hostname, e)); + } } /** @@ -227,7 +245,7 @@ private CompletableFuture createConnection(InetSocketAddress unresolved private CompletableFuture connectToResolvedAddresses(Iterator unresolvedAddresses, int port) { CompletableFuture future = new CompletableFuture<>(); - connectToAddress(unresolvedAddresses.next(), port).thenAccept(channel -> { + connectToAddress(unresolvedAddresses.next(), port, false).thenAccept(channel -> { // Successfully connected to server future.complete(channel); }).exceptionally(exception -> { @@ -266,9 +284,27 @@ CompletableFuture> resolveName(String hostname) { /** * Attempt to establish a TCP connection to an already resolved single IP address */ - private CompletableFuture connectToAddress(InetAddress ipAddress, int port) { + private CompletableFuture connectToAddress(InetAddress ipAddress, int port, boolean ignoreProxyUrl) { CompletableFuture future = new CompletableFuture<>(); + if (!ignoreProxyUrl && isSniProxy()) { + // client wants to connect to proxy and wants to pass + // target connection host in sni header + channelInitializerHandler.setSniHostName(ipAddress.getHostName()); + channelInitializerHandler.setSniHostPort(port); + // connect to proxy host + try { + URI proxyURI = new URI(clientConfig.getProxyServiceUrl()); + // resolve proxy host-address and try to connect again by passing flag ignoreProxyUrl because proxy-host + // will be already resolved + return resolveName(proxyURI.getHost()) + .thenCompose(inetAddresses -> connectToAddress(inetAddresses.iterator().next(), proxyURI.getPort(), true)); + } catch (URISyntaxException e) { + log.error("Failed to parse proxy-service url {}", clientConfig.getProxyServiceUrl(), e); + future.completeExceptionally(e); + return future; + } + } bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> { if (channelFuture.isSuccess()) { future.complete(channelFuture.channel()); @@ -307,5 +343,10 @@ public static int signSafeMod(long dividend, int divisor) { return mod; } + private boolean isSniProxy() { + return channelInitializerHandler.isTlsEnabled() && clientConfig.getProxyProtocol() != null + && StringUtils.isNotBlank(clientConfig.getProxyServiceUrl()); + } + private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index 4a145e74101261..e418904a19a21e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -27,6 +27,10 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; +import lombok.Getter; +import lombok.Setter; + +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.ObjectCache; @@ -41,11 +45,16 @@ public class PulsarChannelInitializer extends ChannelInitializer public static final String TLS_HANDLER = "tls"; private final Supplier clientCnxSupplier; + @Getter private final boolean tlsEnabled; private final boolean tlsEnabledWithKeyStore; private final Supplier sslContextSupplier; private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder; + @Setter + private String sniHostName; + @Setter + private int sniHostPort; private static final long TLS_CERTIFICATE_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(1); @@ -99,9 +108,12 @@ public void initChannel(SocketChannel ch) throws Exception { if (tlsEnabledWithKeyStore) { ch.pipeline().addLast(TLS_HANDLER, new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine())); - } else { - ch.pipeline().addLast(TLS_HANDLER, sslContextSupplier.get().newHandler(ch.alloc())); - } + } else { + SslHandler handler = StringUtils.isNotBlank(sniHostName) + ? sslContextSupplier.get().newHandler(ch.alloc(), sniHostName, sniHostPort) + : sslContextSupplier.get().newHandler(ch.alloc()); + ch.pipeline().addLast(TLS_HANDLER, handler); + } ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); } else { ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index ce4cfe33c3d06b..38e96bf8348bbd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -26,6 +26,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.client.api.ServiceUrlProvider; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; @@ -85,6 +86,10 @@ public class ClientConfigurationData implements Serializable, Cloneable { private Set tlsCiphers = Sets.newTreeSet(); private Set tlsProtocols = Sets.newTreeSet(); + /** proxyServiceUrl and proxyProtocol must be mutually inclusive **/ + private String proxyServiceUrl; + private ProxyProtocol proxyProtocol; + @JsonIgnore private Clock clock = Clock.systemDefaultZone(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java index e5b2859cd14682..35cecbbfd90ddb 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java @@ -26,6 +26,8 @@ import java.util.LinkedHashSet; import java.util.Objects; +import org.apache.pulsar.client.api.ProxyProtocol; + /** * The configuration data for a cluster. */ @@ -58,6 +60,20 @@ public class ClusterData { example = "pulsar+ssl://pulsar.example.com:6651" ) private String brokerServiceUrlTls; + @ApiModelProperty( + name = "proxyServiceUrl", + value = "Proxy-service url when client would like to connect to broker via proxy.", + example = "pulsar+ssl://ats-proxy.example.com:4443 or " + + "pulsar://ats-proxy.example.com:4080" + ) + private String proxyServiceUrl; + @ApiModelProperty( + name = "proxyProtocol", + value = "protocol to decide type of proxy routing eg: SNI-routing", + example = "SNI" + ) + private ProxyProtocol proxyProtocol; + // For given Cluster1(us-west1, us-east1) and Cluster2(us-west2, us-east2) // Peer: [us-west1 -> us-west2] and [us-east1 -> us-east2] @ApiModelProperty( @@ -85,6 +101,16 @@ public ClusterData(String serviceUrl, String serviceUrlTls, String brokerService this.brokerServiceUrlTls = brokerServiceUrlTls; } + public ClusterData(String serviceUrl, String serviceUrlTls, String brokerServiceUrl, String brokerServiceUrlTls, + String proxyServiceUrl, ProxyProtocol proxyProtocol) { + this.serviceUrl = serviceUrl; + this.serviceUrlTls = serviceUrlTls; + this.brokerServiceUrl = brokerServiceUrl; + this.brokerServiceUrlTls = brokerServiceUrlTls; + this.proxyServiceUrl = proxyServiceUrl; + this.proxyProtocol = proxyProtocol; + } + public void update(ClusterData other) { checkNotNull(other); this.serviceUrl = other.serviceUrl; @@ -125,6 +151,22 @@ public void setBrokerServiceUrlTls(String brokerServiceUrlTls) { this.brokerServiceUrlTls = brokerServiceUrlTls; } + public String getProxyServiceUrl() { + return proxyServiceUrl; + } + + public void setProxyServiceUrl(String proxyServiceUrl) { + this.proxyServiceUrl = proxyServiceUrl; + } + + public ProxyProtocol getProxyProtocol() { + return proxyProtocol; + } + + public void setProxyProtocol(ProxyProtocol proxyProtocol) { + this.proxyProtocol = proxyProtocol; + } + public LinkedHashSet getPeerClusterNames() { return peerClusterNames; } @@ -139,7 +181,9 @@ public boolean equals(Object obj) { ClusterData other = (ClusterData) obj; return Objects.equals(serviceUrl, other.serviceUrl) && Objects.equals(serviceUrlTls, other.serviceUrlTls) && Objects.equals(brokerServiceUrl, other.brokerServiceUrl) - && Objects.equals(brokerServiceUrlTls, other.brokerServiceUrlTls); + && Objects.equals(brokerServiceUrlTls, other.brokerServiceUrlTls) + && Objects.equals(proxyServiceUrl, other.proxyServiceUrl) + && Objects.equals(proxyProtocol, other.proxyProtocol); } return false; @@ -154,6 +198,7 @@ public int hashCode() { public String toString() { return MoreObjects.toStringHelper(this).add("serviceUrl", serviceUrl).add("serviceUrlTls", serviceUrlTls) .add("brokerServiceUrl", brokerServiceUrl).add("brokerServiceUrlTls", brokerServiceUrlTls) + .add("proxyServiceUrl", proxyServiceUrl).add("proxyProtocol", proxyProtocol) .add("peerClusterNames", peerClusterNames).toString(); }