Skip to content

Commit

Permalink
[PIP-60] [Proxy-Server] Support SNI routing to support various proxy-…
Browse files Browse the repository at this point in the history
…server in pulsar (apache#6566)

### Motivation
Implementation of [PIP-60](https://github.com/apache/pulsar/wiki/PIP-60:-Support-Proxy-server-with-SNI-routing)
A proxy server is a go‑between or intermediary server that forwards requests from multiple clients to different servers across the Internet. The proxy server can act as a “traffic cop,” in both forward and reverse proxy scenarios, and adds various benefits in your system such as load balancing, performance, security, auto-scaling, etc.. There are already many proxy servers already available in the market which are fast, scalable and more importantly covers various essential security aspects that are needed by the large organization to securely share their confidential data over the network. Pulsar already provides proxy implementation PIP-1 which acts as a reverse proxy and creates a gateway in front of brokers. However, pulsar doesn’t provide support to use other proxies such as Apache traffic server (ATS), HAProxy, Nginx, Envoy those are more scalable and secured. Most of these proxy-servers support SNI ROUTING which can route traffic to a destination without having to terminate the SSL connection. Routing at layer 4 gives greater transparency because the outbound connection is determined by examining the destination address in the client TCP packets.
[Netty supports sending SNI header on TLS handshake](netty/netty#3801 (comment)) and this PR uses that Netty feature to send SNI header while connecting to proxy.

### Modification
https://github.com/apache/pulsar/wiki/PIP-60:-Support-Proxy-server-with-SNI-routing#changes

**Note:** we have fully tested this changes with ATS proxy for both forward and reverse proxy scenarios. And I have also shared e2e example in PIP to use ATS proxy for client and broker integration.
  • Loading branch information
rdhabalia authored and huangdx0726 committed Aug 24, 2020
1 parent 8730dbe commit fba2a94
Show file tree
Hide file tree
Showing 12 changed files with 351 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -861,7 +862,11 @@ public PulsarClient getReplicationClient(String cluster) {
clientBuilder.serviceUrl(
isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
}

if (data.getProxyProtocol() != null && StringUtils.isNotBlank(data.getProxyServiceUrl())) {
clientBuilder.proxyServiceUrl(data.getProxyServiceUrl(), data.getProxyProtocol());
log.info("Configuring proxy-url {} with protocol {}", data.getProxyServiceUrl(),
data.getProxyProtocol());
}
// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, workerGroup);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

import lombok.Cleanup;

public class ProxyProtocolTest extends TlsProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxyProtocolTest.class);

@Test
public void testSniProxyProtocol() throws Exception {

// Client should try to connect to proxy and pass broker-url as SNI header
String proxyUrl = pulsar.getBrokerServiceUrlTls();
String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
String topicName = "persistent://my-property/use/my-ns/my-topic1";

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
.proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
clientBuilder.authentication(AuthenticationTls.class.getName(), authParams);

@Cleanup
PulsarClient pulsarClient = clientBuilder.build();

// should be able to create producer successfully
pulsarClient.newProducer().topic(topicName).create();
}

@Test
public void testSniProxyProtocolWithInvalidProxyUrl() throws Exception {

// Client should try to connect to proxy and pass broker-url as SNI header
String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
String proxyHost = "invalid-url";
String proxyUrl = "pulsar+ssl://" + proxyHost + ":5555";
String topicName = "persistent://my-property/use/my-ns/my-topic1";

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
.proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
clientBuilder.authentication(AuthenticationTls.class.getName(), authParams);

@Cleanup
PulsarClient pulsarClient = clientBuilder.build();

try {
pulsarClient.newProducer().topic(topicName).create();
fail("should have failed due to invalid url");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains(proxyHost));
}
}

@Test
public void testSniProxyProtocolWithoutTls() throws Exception {
// Client should try to connect to proxy and pass broker-url as SNI header
String proxyUrl = pulsar.getBrokerServiceUrl();
String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
String topicName = "persistent://my-property/use/my-ns/my-topic1";

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
.proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);

@Cleanup
PulsarClient pulsarClient = clientBuilder.build();

try {
pulsarClient.newProducer().topic(topicName).create();
fail("should have failed due to non-tls url");
} catch (PulsarClientException e) {
// Ok
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -468,4 +468,14 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
* @return the client builder instance
*/
ClientBuilder clock(Clock clock);

/**
* Proxy-service url when client would like to connect to broker via proxy. Client can choose type of proxy-routing
* using {@link ProxyProtocol}.
*
* @param proxyServiceUrl proxy service url
* @param proxyProtocol protocol to decide type of proxy routing eg: SNI-routing
* @return
*/
ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

/**
* Protcol type to determine type of proxy routing when client connects to proxy using
* {@link ClientBuilder::proxyServiceUrl}.
*/
public enum ProxyProtocol {
/**
* Follows SNI-routing
* https://docs.trafficserver.apache.org/en/latest/admin-guide/layer-4-routing.en.html#sni-routing.
**/
SNI
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,25 @@ public PulsarClientException(Throwable t) {
/**
* Constructs an {@code PulsarClientException} with the specified cause.
*
* @param msg
* The detail message (which is saved for later retrieval by the {@link #getMessage()} method)
*
* @param t
* The cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A null value is permitted,
* and indicates that the cause is nonexistent or unknown.)
* The cause (which is saved for later retrieval by the {@link #getCause()} method). (A null value is
* permitted, and indicates that the cause is nonexistent or unknown.)
*/
public PulsarClientException(String msg, Throwable t) {
super(msg, t);
}

/**
* Constructs an {@code PulsarClientException} with the specified cause.
*
* @param t
* The cause (which is saved for later retrieval by the {@link #getCause()} method). (A null value is
* permitted, and indicates that the cause is nonexistent or unknown.)
* @param sequenceId
* The sequenceId of the message
* The sequenceId of the message
*/
public PulsarClientException(Throwable t, long sequenceId) {
super(t);
Expand All @@ -95,6 +108,21 @@ public static class InvalidServiceURL extends PulsarClientException {
public InvalidServiceURL(Throwable t) {
super(t);
}

/**
* Constructs an {@code InvalidServiceURL} with the specified cause.
*
*@param msg
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
* @param t
* The cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A null value is permitted,
* and indicates that the cause is nonexistent or unknown.)
*/
public InvalidServiceURL(String msg, Throwable t) {
super(msg, t);
}
}

/**
Expand Down Expand Up @@ -123,6 +151,21 @@ public InvalidConfigurationException(String msg) {
public InvalidConfigurationException(Throwable t) {
super(t);
}

/**
* Constructs an {@code InvalidConfigurationException} with the specified cause.
*
*@param msg
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
* @param t
* The cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A null value is permitted,
* and indicates that the cause is nonexistent or unknown.)
*/
public InvalidConfigurationException(String msg, Throwable t) {
super(msg, t);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;

Expand Down Expand Up @@ -69,10 +70,17 @@ private class Create extends CliCommand {
@Parameter(names = "--broker-url-secure", description = "broker-service-url for secure connection", required = false)
private String brokerServiceUrlTls;

@Parameter(names = "--proxy-url", description = "Proxy-service url when client would like to connect to broker via proxy.", required = false)
private String proxyServiceUrl;

@Parameter(names = "--proxy-protocol", description = "protocol to decide type of proxy routing eg: SNI", required = false)
private ProxyProtocol proxyProtocol;

void run() throws PulsarAdminException {
String cluster = getOneArgument(params);
admin.clusters().createCluster(cluster,
new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls));
new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls, proxyServiceUrl,
proxyProtocol));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;

Expand All @@ -43,6 +44,12 @@ public class PulsarClientTool {
@Parameter(names = { "--url" }, description = "Broker URL to which to connect.")
String serviceURL = null;

@Parameter(names = { "--proxy-url" }, description = "Proxy-server URL to which to connect.")
String proxyServiceURL = null;

@Parameter(names = { "--proxy-protocol" }, description = "Proxy protocol to select type of routing at proxy.")
ProxyProtocol proxyProtocol = null;

@Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name.")
String authPluginClassName = null;

Expand Down Expand Up @@ -117,6 +124,13 @@ private void updateConfig() throws UnsupportedAuthenticationException {
.tlsTrustStorePath(tlsTrustStorePath)
.tlsTrustStorePassword(tlsTrustStorePassword);

if (StringUtils.isNotBlank(proxyServiceURL)) {
if (proxyProtocol == null) {
System.out.println("proxy-protocol must be provided with proxy-url");
System.exit(-1);
}
clientBuilder.proxyServiceUrl(proxyServiceURL, proxyProtocol);
}
this.produceCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
this.consumeCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
Expand Down Expand Up @@ -294,4 +295,14 @@ public ClientBuilder clock(Clock clock) {
conf.setClock(clock);
return this;
}

@Override
public ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol) {
if (StringUtils.isNotBlank(proxyServiceUrl) && proxyProtocol == null) {
throw new IllegalArgumentException("proxyProtocol must be present with proxyServiceUrl");
}
conf.setProxyServiceUrl(proxyServiceUrl);
conf.setProxyProtocol(proxyProtocol);
return this;
}
}
Loading

0 comments on commit fba2a94

Please sign in to comment.