Skip to content

Commit

Permalink
[Proxy-Server] Support SNI routing to support various proxy-server in…
Browse files Browse the repository at this point in the history
… pulsar

addressed comments

fix tls handler
  • Loading branch information
rdhabalia committed Jun 2, 2020
1 parent c64b22a commit 2a6628a
Show file tree
Hide file tree
Showing 12 changed files with 351 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -861,7 +862,11 @@ public PulsarClient getReplicationClient(String cluster) {
clientBuilder.serviceUrl(
isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
}

if (data.getProxyProtocol() != null && StringUtils.isNotBlank(data.getProxyServiceUrl())) {
clientBuilder.proxyServiceUrl(data.getProxyServiceUrl(), data.getProxyProtocol());
log.info("Configuring proxy-url {} with protocol {}", data.getProxyServiceUrl(),
data.getProxyProtocol());
}
// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, workerGroup);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

import lombok.Cleanup;

public class ProxyProtocolTest extends TlsProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxyProtocolTest.class);

@Test
public void testSniProxyProtocol() throws Exception {

// Client should try to connect to proxy and pass broker-url as SNI header
String proxyUrl = pulsar.getBrokerServiceUrlTls();
String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
String topicName = "persistent://my-property/use/my-ns/my-topic1";

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
.proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
clientBuilder.authentication(AuthenticationTls.class.getName(), authParams);

@Cleanup
PulsarClient pulsarClient = clientBuilder.build();

// should be able to create producer successfully
pulsarClient.newProducer().topic(topicName).create();
}

@Test
public void testSniProxyProtocolWithInvalidProxyUrl() throws Exception {

// Client should try to connect to proxy and pass broker-url as SNI header
String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
String proxyHost = "invalid-url";
String proxyUrl = "pulsar+ssl://" + proxyHost + ":5555";
String topicName = "persistent://my-property/use/my-ns/my-topic1";

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
.proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
clientBuilder.authentication(AuthenticationTls.class.getName(), authParams);

@Cleanup
PulsarClient pulsarClient = clientBuilder.build();

try {
pulsarClient.newProducer().topic(topicName).create();
fail("should have failed due to invalid url");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains(proxyHost));
}
}

@Test
public void testSniProxyProtocolWithoutTls() throws Exception {
// Client should try to connect to proxy and pass broker-url as SNI header
String proxyUrl = pulsar.getBrokerServiceUrl();
String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
String topicName = "persistent://my-property/use/my-ns/my-topic1";

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
.proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);

@Cleanup
PulsarClient pulsarClient = clientBuilder.build();

try {
pulsarClient.newProducer().topic(topicName).create();
fail("should have failed due to non-tls url");
} catch (PulsarClientException e) {
// Ok
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,4 +460,14 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
* @return the client builder instance
*/
ClientBuilder clock(Clock clock);

/**
* Proxy-service url when client would like to connect to broker via proxy. Client can choose type of proxy-routing
* using {@link ProxyProtocol}.
*
* @param proxyServiceUrl proxy service url
* @param proxyProtocol protocol to decide type of proxy routing eg: SNI-routing
* @return
*/
ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

/**
* Protcol type to determine type of proxy routing when client connects to proxy using
* {@link ClientBuilder::proxyServiceUrl}.
*/
public enum ProxyProtocol {
/**
* Follows SNI-routing
* https://docs.trafficserver.apache.org/en/latest/admin-guide/layer-4-routing.en.html#sni-routing.
**/
SNI
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,25 @@ public PulsarClientException(Throwable t) {
/**
* Constructs an {@code PulsarClientException} with the specified cause.
*
* @param msg
* The detail message (which is saved for later retrieval by the {@link #getMessage()} method)
*
* @param t
* The cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A null value is permitted,
* and indicates that the cause is nonexistent or unknown.)
* The cause (which is saved for later retrieval by the {@link #getCause()} method). (A null value is
* permitted, and indicates that the cause is nonexistent or unknown.)
*/
public PulsarClientException(String msg, Throwable t) {
super(msg, t);
}

/**
* Constructs an {@code PulsarClientException} with the specified cause.
*
* @param t
* The cause (which is saved for later retrieval by the {@link #getCause()} method). (A null value is
* permitted, and indicates that the cause is nonexistent or unknown.)
* @param sequenceId
* The sequenceId of the message
* The sequenceId of the message
*/
public PulsarClientException(Throwable t, long sequenceId) {
super(t);
Expand All @@ -95,6 +108,21 @@ public static class InvalidServiceURL extends PulsarClientException {
public InvalidServiceURL(Throwable t) {
super(t);
}

/**
* Constructs an {@code InvalidServiceURL} with the specified cause.
*
*@param msg
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
* @param t
* The cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A null value is permitted,
* and indicates that the cause is nonexistent or unknown.)
*/
public InvalidServiceURL(String msg, Throwable t) {
super(msg, t);
}
}

/**
Expand Down Expand Up @@ -123,6 +151,21 @@ public InvalidConfigurationException(String msg) {
public InvalidConfigurationException(Throwable t) {
super(t);
}

/**
* Constructs an {@code InvalidConfigurationException} with the specified cause.
*
*@param msg
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
* @param t
* The cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A null value is permitted,
* and indicates that the cause is nonexistent or unknown.)
*/
public InvalidConfigurationException(String msg, Throwable t) {
super(msg, t);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;

Expand Down Expand Up @@ -69,10 +70,17 @@ private class Create extends CliCommand {
@Parameter(names = "--broker-url-secure", description = "broker-service-url for secure connection", required = false)
private String brokerServiceUrlTls;

@Parameter(names = "--proxy-url", description = "Proxy-service url when client would like to connect to broker via proxy.", required = false)
private String proxyServiceUrl;

@Parameter(names = "--proxy-protocol", description = "protocol to decide type of proxy routing eg: SNI", required = false)
private ProxyProtocol proxyProtocol;

void run() throws PulsarAdminException {
String cluster = getOneArgument(params);
admin.clusters().createCluster(cluster,
new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls));
new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls, proxyServiceUrl,
proxyProtocol));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;

Expand All @@ -43,6 +44,12 @@ public class PulsarClientTool {
@Parameter(names = { "--url" }, description = "Broker URL to which to connect.")
String serviceURL = null;

@Parameter(names = { "--proxy-url" }, description = "Proxy-server URL to which to connect.")
String proxyServiceURL = null;

@Parameter(names = { "--proxy-protocol" }, description = "Proxy protocol to select type of routing at proxy.")
ProxyProtocol proxyProtocol = null;

@Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name.")
String authPluginClassName = null;

Expand Down Expand Up @@ -117,6 +124,13 @@ private void updateConfig() throws UnsupportedAuthenticationException {
.tlsTrustStorePath(tlsTrustStorePath)
.tlsTrustStorePassword(tlsTrustStorePassword);

if (StringUtils.isNotBlank(proxyServiceURL)) {
if (proxyProtocol == null) {
System.out.println("proxy-protocol must be provided with proxy-url");
System.exit(-1);
}
clientBuilder.proxyServiceUrl(proxyServiceURL, proxyProtocol);
}
this.produceCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
this.consumeCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
Expand Down Expand Up @@ -285,4 +286,14 @@ public ClientBuilder clock(Clock clock) {
conf.setClock(clock);
return this;
}

@Override
public ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol) {
if (StringUtils.isNotBlank(proxyServiceUrl) && proxyProtocol == null) {
throw new IllegalArgumentException("proxyProtocol must be present with proxyServiceUrl");
}
conf.setProxyServiceUrl(proxyServiceUrl);
conf.setProxyProtocol(proxyProtocol);
return this;
}
}
Loading

0 comments on commit 2a6628a

Please sign in to comment.