Skip to content

Commit

Permalink
[improve] PIP-337: Implement SSL Factory Plugin to customize SSL Cont…
Browse files Browse the repository at this point in the history
…ext and SSL Engine generation (#23110)

Co-authored-by: Apurva Telang <atelang@paypal.com>
  • Loading branch information
Apurva007 and Apurva Telang authored Aug 20, 2024
1 parent 9edaa85 commit 2d46bfa
Show file tree
Hide file tree
Showing 75 changed files with 2,264 additions and 1,302 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.apache.pulsar.common.util.DefaultPulsarSslFactory;
import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
Expand Down Expand Up @@ -1581,6 +1582,15 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
doc = "Specify whether Client certificates are required for TLS Reject.\n"
+ "the Connection if the Client Certificate is not trusted")
private boolean tlsRequireTrustedClientCertOnConnect = false;
@FieldContext(
category = CATEGORY_TLS,
doc = "SSL Factory Plugin class to provide SSLEngine and SSLContext objects. The default "
+ " class used is DefaultSslFactory.")
private String sslFactoryPlugin = DefaultPulsarSslFactory.class.getName();
@FieldContext(
category = CATEGORY_TLS,
doc = "SSL Factory plugin configuration parameters.")
private String sslFactoryPluginParams = "";

/***** --- Authentication. --- ****/
@FieldContext(
Expand Down Expand Up @@ -3546,6 +3556,15 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
+ " used by the internal client to authenticate with Pulsar brokers"
)
private Set<String> brokerClientTlsProtocols = new TreeSet<>();
@FieldContext(
category = CATEGORY_TLS,
doc = "SSL Factory Plugin class used by internal client to provide SSLEngine and SSLContext objects. "
+ "The default class used is DefaultSslFactory.")
private String brokerClientSslFactoryPlugin = DefaultPulsarSslFactory.class.getName();
@FieldContext(
category = CATEGORY_TLS,
doc = "SSL Factory plugin configuration parameters used by internal client.")
private String brokerClientSslFactoryPluginParams = "";

/* packages management service configurations (begin) */

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import java.util.Set;
import javax.net.ssl.SSLContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.DefaultSslContextBuilder;
import org.apache.pulsar.common.util.PulsarSslFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
import org.apache.pulsar.common.util.keystoretls.NetSslContextBuilder;
import org.eclipse.jetty.util.ssl.SslContextFactory;

@Slf4j
Expand All @@ -35,57 +33,21 @@ public class JettySslContextFactory {
}
}

public static SslContextFactory.Server createServerSslContextWithKeystore(String sslProviderString,
String keyStoreTypeString,
String keyStore,
String keyStorePassword,
boolean allowInsecureConnection,
String trustStoreTypeString,
String trustStore,
String trustStorePassword,
boolean requireTrustedClientCertOnConnect,
Set<String> ciphers,
Set<String> protocols,
long certRefreshInSec) {
NetSslContextBuilder sslCtxRefresher = new NetSslContextBuilder(
sslProviderString,
keyStoreTypeString,
keyStore,
keyStorePassword,
allowInsecureConnection,
trustStoreTypeString,
trustStore,
trustStorePassword,
requireTrustedClientCertOnConnect,
certRefreshInSec);

return new JettySslContextFactory.Server(sslProviderString, sslCtxRefresher,
public static SslContextFactory.Server createSslContextFactory(String sslProviderString,
PulsarSslFactory pulsarSslFactory,
boolean requireTrustedClientCertOnConnect,
Set<String> ciphers, Set<String> protocols) {
return new JettySslContextFactory.Server(sslProviderString, pulsarSslFactory,
requireTrustedClientCertOnConnect, ciphers, protocols);
}

public static SslContextFactory createServerSslContext(String sslProviderString, boolean tlsAllowInsecureConnection,
String tlsTrustCertsFilePath,
String tlsCertificateFilePath,
String tlsKeyFilePath,
boolean tlsRequireTrustedClientCertOnConnect,
Set<String> ciphers,
Set<String> protocols,
long certRefreshInSec) {
DefaultSslContextBuilder sslCtxRefresher =
new DefaultSslContextBuilder(tlsAllowInsecureConnection, tlsTrustCertsFilePath, tlsCertificateFilePath,
tlsKeyFilePath, tlsRequireTrustedClientCertOnConnect, certRefreshInSec, sslProviderString);

return new JettySslContextFactory.Server(sslProviderString, sslCtxRefresher,
tlsRequireTrustedClientCertOnConnect, ciphers, protocols);
}

private static class Server extends SslContextFactory.Server {
private final SslContextAutoRefreshBuilder<SSLContext> sslCtxRefresher;
private final PulsarSslFactory pulsarSslFactory;

public Server(String sslProviderString, SslContextAutoRefreshBuilder<SSLContext> sslCtxRefresher,
public Server(String sslProviderString, PulsarSslFactory pulsarSslFactory,
boolean requireTrustedClientCertOnConnect, Set<String> ciphers, Set<String> protocols) {
super();
this.sslCtxRefresher = sslCtxRefresher;
this.pulsarSslFactory = pulsarSslFactory;

if (ciphers != null && ciphers.size() > 0) {
this.setIncludeCipherSuites(ciphers.toArray(new String[0]));
Expand All @@ -110,7 +72,7 @@ public Server(String sslProviderString, SslContextAutoRefreshBuilder<SSLContext>

@Override
public SSLContext getSslContext() {
return sslCtxRefresher.get();
return this.pulsarSslFactory.getInternalSslContext();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.testng.annotations.Test;
import org.apache.pulsar.common.util.DefaultPulsarSslFactory;
import org.apache.pulsar.common.util.PulsarSslConfiguration;
import org.apache.pulsar.common.util.PulsarSslFactory;

@Slf4j
public class JettySslContextFactoryTest {
Expand All @@ -51,16 +54,20 @@ public void testJettyTlsServerTls() throws Exception {
@Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory factory = JettySslContextFactory.createServerSslContext(
null,
false,
Resources.getResource("ssl/my-ca/ca.pem").getPath(),
Resources.getResource("ssl/my-ca/server-ca.pem").getPath(),
Resources.getResource("ssl/my-ca/server-key.pem").getPath(),
true,
null,
null,
600);
PulsarSslConfiguration sslConfiguration = PulsarSslConfiguration.builder()
.tlsTrustCertsFilePath(Resources.getResource("ssl/my-ca/ca.pem").getPath())
.tlsCertificateFilePath(Resources.getResource("ssl/my-ca/server-ca.pem").getPath())
.tlsKeyFilePath(Resources.getResource("ssl/my-ca/server-key.pem").getPath())
.allowInsecureConnection(false)
.requireTrustedClientCertOnConnect(true)
.tlsEnabledWithKeystore(false)
.isHttps(true)
.build();
PulsarSslFactory sslFactory = new DefaultPulsarSslFactory();
sslFactory.initialize(sslConfiguration);
sslFactory.createInternalSslContext();
SslContextFactory factory = JettySslContextFactory.createSslContextFactory(null,
sslFactory, true, null, null);

ServerConnector connector = new ServerConnector(server, factory);
connector.setPort(0);
Expand All @@ -85,20 +92,30 @@ public void testJettyTlsServerInvalidTlsProtocol() throws Exception {
@Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory factory = JettySslContextFactory.createServerSslContext(
null,
false,
Resources.getResource("ssl/my-ca/ca.pem").getPath(),
Resources.getResource("ssl/my-ca/server-ca.pem").getPath(),
Resources.getResource("ssl/my-ca/server-key.pem").getPath(),
true,
null,
PulsarSslConfiguration sslConfiguration = PulsarSslConfiguration.builder()
.tlsProtocols(new HashSet<String>() {
{
this.add("TLSv1.3");
}
})
.tlsTrustCertsFilePath(Resources.getResource("ssl/my-ca/ca.pem").getPath())
.tlsCertificateFilePath(Resources.getResource("ssl/my-ca/server-ca.pem").getPath())
.tlsKeyFilePath(Resources.getResource("ssl/my-ca/server-key.pem").getPath())
.allowInsecureConnection(false)
.requireTrustedClientCertOnConnect(true)
.tlsEnabledWithKeystore(false)
.isHttps(true)
.build();
PulsarSslFactory sslFactory = new DefaultPulsarSslFactory();
sslFactory.initialize(sslConfiguration);
sslFactory.createInternalSslContext();
SslContextFactory factory = JettySslContextFactory.createSslContextFactory(null,
sslFactory, true, null,
new HashSet<String>() {
{
this.add("TLSv1.3");
}
},
600);
});
factory.setHostnameVerifier((s, sslSession) -> true);
ServerConnector connector = new ServerConnector(server, factory);
connector.setPort(0);
Expand All @@ -123,25 +140,40 @@ public void testJettyTlsServerInvalidCipher() throws Exception {
@Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory factory = JettySslContextFactory.createServerSslContext(
null,
false,
Resources.getResource("ssl/my-ca/ca.pem").getPath(),
Resources.getResource("ssl/my-ca/server-ca.pem").getPath(),
Resources.getResource("ssl/my-ca/server-key.pem").getPath(),
true,
PulsarSslConfiguration sslConfiguration = PulsarSslConfiguration.builder()
.tlsCiphers(new HashSet<String>() {
{
this.add("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");
}
})
.tlsProtocols(new HashSet<String>() {
{
this.add("TLSv1.3");
}
})
.tlsTrustCertsFilePath(Resources.getResource("ssl/my-ca/ca.pem").getPath())
.tlsCertificateFilePath(Resources.getResource("ssl/my-ca/server-ca.pem").getPath())
.tlsKeyFilePath(Resources.getResource("ssl/my-ca/server-key.pem").getPath())
.allowInsecureConnection(false)
.requireTrustedClientCertOnConnect(true)
.isHttps(true)
.tlsEnabledWithKeystore(false)
.build();
PulsarSslFactory sslFactory = new DefaultPulsarSslFactory();
sslFactory.initialize(sslConfiguration);
sslFactory.createInternalSslContext();
SslContextFactory factory = JettySslContextFactory.createSslContextFactory(null,
sslFactory, true,
new HashSet<String>() {
{
this.add("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");
}
},
new HashSet<String>() {
{
this.add("TLSv1.2");
this.add("TLSv1.3");
}
},
600);

});
factory.setHostnameVerifier((s, sslSession) -> true);
ServerConnector connector = new ServerConnector(server, factory);
connector.setPort(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.apache.pulsar.common.util.DefaultPulsarSslFactory;
import org.apache.pulsar.common.util.PulsarSslConfiguration;
import org.apache.pulsar.common.util.PulsarSslFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.ssl.SslContextFactory;
Expand All @@ -66,10 +69,22 @@ public void testJettyTlsServerTls() throws Exception {
@Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory.Server factory = JettySslContextFactory.createServerSslContextWithKeystore(null,
keyStoreType, brokerKeyStorePath, keyStorePassword, false, keyStoreType,
clientTrustStorePath, keyStorePassword, true, null,
null, 600);
PulsarSslConfiguration sslConfiguration = PulsarSslConfiguration.builder()
.tlsKeyStoreType(keyStoreType)
.tlsKeyStorePath(brokerKeyStorePath)
.tlsKeyStorePassword(keyStorePassword)
.tlsTrustStoreType(keyStoreType)
.tlsTrustStorePath(clientTrustStorePath)
.tlsTrustStorePassword(keyStorePassword)
.requireTrustedClientCertOnConnect(true)
.tlsEnabledWithKeystore(true)
.isHttps(true)
.build();
PulsarSslFactory sslFactory = new DefaultPulsarSslFactory();
sslFactory.initialize(sslConfiguration);
sslFactory.createInternalSslContext();
SslContextFactory.Server factory = JettySslContextFactory.createSslContextFactory(null,
sslFactory, true, null, null);
factory.setHostnameVerifier((s, sslSession) -> true);
ServerConnector connector = new ServerConnector(server, factory);
connector.setPort(0);
Expand All @@ -95,14 +110,32 @@ public void testJettyTlsServerInvalidTlsProtocol() throws Exception {
@Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory.Server factory = JettySslContextFactory.createServerSslContextWithKeystore(null,
keyStoreType, brokerKeyStorePath, keyStorePassword, false, keyStoreType, clientTrustStorePath,
keyStorePassword, true, null,
PulsarSslConfiguration sslConfiguration = PulsarSslConfiguration.builder()
.tlsKeyStoreType(keyStoreType)
.tlsKeyStorePath(brokerKeyStorePath)
.tlsKeyStorePassword(keyStorePassword)
.tlsTrustStoreType(keyStoreType)
.tlsTrustStorePath(clientTrustStorePath)
.tlsTrustStorePassword(keyStorePassword)
.tlsProtocols(new HashSet<String>() {
{
this.add("TLSv1.3");
}
})
.requireTrustedClientCertOnConnect(true)
.tlsEnabledWithKeystore(true)
.isHttps(true)
.build();
PulsarSslFactory sslFactory = new DefaultPulsarSslFactory();
sslFactory.initialize(sslConfiguration);
sslFactory.createInternalSslContext();
SslContextFactory.Server factory = JettySslContextFactory.createSslContextFactory(null,
sslFactory, true, null,
new HashSet<String>() {
{
this.add("TLSv1.3");
}
}, 600);
});
factory.setHostnameVerifier((s, sslSession) -> true);
ServerConnector connector = new ServerConnector(server, factory);
connector.setPort(0);
Expand All @@ -127,18 +160,43 @@ public void testJettyTlsServerInvalidCipher() throws Exception {
@Cleanup("stop")
Server server = new Server();
List<ServerConnector> connectors = new ArrayList<>();
SslContextFactory.Server factory = JettySslContextFactory.createServerSslContextWithKeystore(null,
keyStoreType, brokerKeyStorePath, keyStorePassword, false, keyStoreType, clientTrustStorePath,
keyStorePassword, true, new HashSet<String>() {
PulsarSslConfiguration sslConfiguration = PulsarSslConfiguration.builder()
.tlsKeyStoreType(keyStoreType)
.tlsKeyStorePath(brokerKeyStorePath)
.tlsKeyStorePassword(keyStorePassword)
.tlsTrustStoreType(keyStoreType)
.tlsTrustStorePath(clientTrustStorePath)
.tlsTrustStorePassword(keyStorePassword)
.tlsCiphers(new HashSet<String>() {
{
this.add("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");
}
})
.tlsProtocols(new HashSet<String>() {
{
this.add("TLSv1.3");
}
})
.requireTrustedClientCertOnConnect(true)
.tlsEnabledWithKeystore(true)
.isHttps(true)
.build();
PulsarSslFactory sslFactory = new DefaultPulsarSslFactory();
sslFactory.initialize(sslConfiguration);
sslFactory.createInternalSslContext();
SslContextFactory.Server factory = JettySslContextFactory.createSslContextFactory(null,
sslFactory, true,
new HashSet<String>() {
{
this.add("TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256");
}
},
new HashSet<String>() {
{
this.add("TLSv1.2");
this.add("TLSv1.3");
}
}, 600);
});
factory.setHostnameVerifier((s, sslSession) -> true);
ServerConnector connector = new ServerConnector(server, factory);
connector.setPort(0);
Expand Down
Loading

0 comments on commit 2d46bfa

Please sign in to comment.