Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 10f28df

Browse files
[security] Proxy to broker and broker to broker hostname verification (#57)
1 parent 9897a4b commit 10f28df

File tree

12 files changed

+63
-27
lines changed

12 files changed

+63
-27
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelInitializer.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
import static io.streamnative.pulsar.handlers.kop.KafkaChannelInitializer.MAX_FRAME_LENGTH;
1717
import static io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler.TLS_HANDLER;
1818

19+
import io.netty.channel.Channel;
1920
import io.netty.channel.ChannelInitializer;
2021
import io.netty.channel.socket.SocketChannel;
2122
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
2223
import io.netty.handler.codec.LengthFieldPrepender;
2324
import io.netty.handler.ssl.SslHandler;
2425
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
2526
import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils;
27+
import java.util.concurrent.CompletableFuture;
2628
import org.eclipse.jetty.util.ssl.SslContextFactory;
2729

2830
/**
@@ -50,13 +52,27 @@ public TransactionMarkerChannelInitializer(KafkaServiceConfiguration kafkaConfig
5052

5153
@Override
5254
protected void initChannel(SocketChannel ch) throws Exception {
53-
if (this.enableTls) {
54-
ch.pipeline().addLast(TLS_HANDLER,
55-
new SslHandler(SSLUtils.createClientSslEngine(sslContextFactory)));
56-
}
5755
ch.pipeline().addLast(lengthFieldPrepender);
5856
ch.pipeline().addLast("frameDecoder",
5957
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
6058
ch.pipeline().addLast("txnHandler", new TransactionMarkerChannelHandler(transactionMarkerChannelManager));
6159
}
60+
61+
protected CompletableFuture<Channel> initTls(Channel ch, String host, int port) {
62+
if (this.enableTls) {
63+
CompletableFuture<Channel> initTlsFuture = new CompletableFuture<>();
64+
ch.eventLoop().execute(() -> {
65+
try {
66+
ch.pipeline().addFirst(TLS_HANDLER,
67+
new SslHandler(SSLUtils.createClientSslEngine(sslContextFactory, host, port)));
68+
initTlsFuture.complete(ch);
69+
} catch (Throwable t) {
70+
initTlsFuture.completeExceptionally(t);
71+
}
72+
});
73+
return initTlsFuture;
74+
} else {
75+
return CompletableFuture.completedFuture(ch);
76+
}
77+
}
6278
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ public class TransactionMarkerChannelManager {
7878

7979
private final Bootstrap bootstrap;
8080

81+
private final TransactionMarkerChannelInitializer transactionMarkerChannelInitializer;
82+
8183
private final Map<InetSocketAddress, CompletableFuture<TransactionMarkerChannelHandler>> handlerMap =
8284
new ConcurrentHashMap<>();
8385

@@ -183,7 +185,9 @@ public TransactionMarkerChannelManager(String tenant,
183185
bootstrap = new Bootstrap();
184186
bootstrap.group(eventLoopGroup);
185187
bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
186-
bootstrap.handler(new TransactionMarkerChannelInitializer(kafkaConfig, enableTls, this));
188+
transactionMarkerChannelInitializer =
189+
new TransactionMarkerChannelInitializer(kafkaConfig, enableTls, this);
190+
bootstrap.handler(transactionMarkerChannelInitializer);
187191
}
188192

189193
public CompletableFuture<TransactionMarkerChannelHandler> getChannel(InetSocketAddress socketAddress) {
@@ -193,7 +197,10 @@ public CompletableFuture<TransactionMarkerChannelHandler> getChannel(InetSocketA
193197
ensureDrainQueuedTransactionMarkersActivity();
194198
return handlerMap.computeIfAbsent(socketAddress, address -> {
195199
CompletableFuture<TransactionMarkerChannelHandler> handlerFuture = new CompletableFuture<>();
196-
ChannelFutures.toCompletableFuture(bootstrap.connect(socketAddress))
200+
ChannelFutures.toCompletableFuture(bootstrap.register())
201+
.thenCompose(ch -> transactionMarkerChannelInitializer
202+
.initTls(ch, socketAddress.getHostString(), socketAddress.getPort()))
203+
.thenCompose(ch -> ChannelFutures.toCompletableFuture(ch.connect(socketAddress)))
197204
.thenAccept(channel -> {
198205
handlerFuture.complete(
199206
(TransactionMarkerChannelHandler) channel.pipeline().get("txnHandler"));

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ssl/SSLUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,9 +266,10 @@ public static SSLEngine createSslEngine(SslContextFactory.Server sslContextFacto
266266
return engine;
267267
}
268268

269-
public static SSLEngine createClientSslEngine(SslContextFactory.Client sslContextFactory) throws Exception {
269+
public static SSLEngine createClientSslEngine(SslContextFactory.Client sslContextFactory,
270+
String host, int port) throws Exception {
270271
sslContextFactory.start();
271-
SSLEngine engine = sslContextFactory.newSSLEngine();
272+
SSLEngine engine = sslContextFactory.newSSLEngine(host, port);
272273
engine.setUseClientMode(true);
273274

274275
return engine;

proxy/src/main/java/io/streamnative/pulsar/handlers/kop/ConnectionToBroker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ private synchronized CompletableFuture<Channel> ensureConnection() {
117117
public void initChannel(SocketChannel ch) throws Exception {
118118
if (enableTls) {
119119
ch.pipeline().addLast(TLS_HANDLER,
120-
new SslHandler(SSLUtils.createClientSslEngine(sslContextFactory)));
120+
new SslHandler(SSLUtils.createClientSslEngine(sslContextFactory, brokerHost, brokerPort)));
121121
}
122122
ch.pipeline().addLast(new LengthFieldPrepender(4));
123123
ch.pipeline().addLast("frameDecoder",

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaSSLChannelTest.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,20 +85,22 @@ public KafkaSSLChannelTest(final String entryFormat, boolean withCertHost, boole
8585
* @param withCertHost the keystore with certHost or not.
8686
*/
8787
private void setSslConfigurations(boolean withCertHost) {
88-
String path = "./src/test/resources/ssl/certificate" + (withCertHost ? "2" : "") + "/";
89-
if (!withCertHost) {
88+
String path = "./src/test/resources/ssl/certificate" + (withCertHost ? "" : "2") + "/";
89+
if (withCertHost) {
9090
this.kopSslKeystoreLocation = path + "broker.keystore.jks";
9191
this.kopSslKeystorePassword = "broker";
9292
this.kopSslTruststoreLocation = path + "broker.truststore.jks";
9393
this.kopSslTruststorePassword = "broker";
94+
this.kopClientTruststoreLocation = path + "broker.truststore.jks";
95+
this.kopClientTruststorePassword = "broker";
9496
} else {
9597
this.kopSslKeystoreLocation = path + "server.keystore.jks";
9698
this.kopSslKeystorePassword = "server";
9799
this.kopSslTruststoreLocation = path + "server.truststore.jks";
98100
this.kopSslTruststorePassword = "server";
101+
kopClientTruststorePassword = "client";
102+
kopClientTruststoreLocation = path + "client.truststore.jks";
99103
}
100-
kopClientTruststoreLocation = path + "client.truststore.jks";
101-
kopClientTruststorePassword = "client";
102104
}
103105

104106
@Factory
@@ -114,6 +116,10 @@ public static Object[] instances() {
114116
}
115117

116118
protected void sslSetUpForBroker() throws Exception {
119+
120+
// require TLS verification when hostname is on certificate
121+
conf.setTlsHostnameVerificationEnabled(withCertHost);
122+
117123
conf.setKafkaTransactionCoordinatorEnabled(true);
118124
conf.setKopTlsEnabledWithBroker(true);
119125
conf.setKopSslKeystoreType("JKS");
@@ -161,7 +167,7 @@ public void testKafkaProduceSSL() throws Exception {
161167
String messageStrPrefix = "Message_Kop_KafkaProduceKafkaConsume_" + partitionNumber + "_";
162168

163169
@Cleanup
164-
SslProducer kProducer = new SslProducer(topicName, getKafkaBrokerPortTls(),
170+
SslProducer kProducer = new SslProducer(topicName, getKafkaBrokerPortTls(), withCertHost,
165171
kopClientTruststoreLocation, kopClientTruststorePassword);
166172

167173
for (int i = 0; i < totalMsgs; i++) {
@@ -196,7 +202,8 @@ public static class SslProducer implements Closeable {
196202
private final KafkaProducer<Integer, String> producer;
197203
private final String topic;
198204

199-
public SslProducer(String topic, int port, String truststoreLocation, String truststorePassword) {
205+
public SslProducer(String topic, int port, boolean withCertHost, String truststoreLocation,
206+
String truststorePassword) {
200207
Properties props = new Properties();
201208
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost" + ":" + port);
202209
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoKafkaOnPulsarProducerSSL");
@@ -209,7 +216,7 @@ public SslProducer(String topic, int port, String truststoreLocation, String tru
209216
props.put("ssl.truststore.password", truststorePassword);
210217

211218
// default is https, here need to set empty.
212-
props.put("ssl.endpoint.identification.algorithm", "");
219+
props.put("ssl.endpoint.identification.algorithm", withCertHost ? "HTTPS" : "");
213220

214221
producer = new KafkaProducer<>(props);
215222
this.topic = topic;
@@ -241,7 +248,7 @@ public void basicProduceAndConsumeWithTxTest() throws Exception {
241248
producerProps.put("ssl.truststore.password", kopClientTruststorePassword);
242249

243250
// default is https, here need to set empty.
244-
producerProps.put("ssl.endpoint.identification.algorithm", "");
251+
producerProps.put("ssl.endpoint.identification.algorithm", withCertHost ? "HTTPS" : "");
245252

246253
@Cleanup
247254
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
@@ -294,7 +301,7 @@ public void basicProduceAndConsumeWithTxTest() throws Exception {
294301
consumerProps.put("ssl.truststore.password", kopClientTruststorePassword);
295302

296303
// default is https, here need to set empty.
297-
consumerProps.put("ssl.endpoint.identification.algorithm", "");
304+
consumerProps.put("ssl.endpoint.identification.algorithm", withCertHost ? "HTTPS" : "");
298305

299306
final int totalMessageCount = totalTxnCount * messageCountPerTxn;
300307

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaSSLChannelWithClientAuthTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,6 @@ public SslProducer(String topic, int port) {
159159
props.put("ssl.keystore.location", "./src/test/resources/ssl/certificate/client.keystore.jks");
160160
props.put("ssl.keystore.password", "client");
161161

162-
// default is https, here need to set empty.
163-
props.put("ssl.endpoint.identification.algorithm", "");
164-
165162
producer = new KafkaProducer<>(props);
166163
this.topic = topic;
167164
}

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ public abstract class KopProtocolHandlerTestBase {
122122
@Getter
123123
protected int kafkaProxyPort = PortManager.nextFreePort();
124124
@Getter
125+
protected int kafkaProxyPortTls = PortManager.nextFreePort();
126+
@Getter
125127
protected int kafkaBrokerPortTls = PortManager.nextFreePort();
126128
@Getter
127129
protected int kafkaSchemaRegistryPort = PortManager.nextFreePort();
@@ -886,7 +888,7 @@ protected void startProxy() throws Exception {
886888
ProxyConfiguration proxyConfiguration = ConfigurationUtils.create(config, ProxyConfiguration.class);
887889

888890
proxyConfiguration.getProperties().put("kafkaListeners",
889-
PLAINTEXT_PREFIX + "localhost:" + kafkaProxyPort + ",");
891+
PLAINTEXT_PREFIX + "localhost:" + kafkaProxyPort + "," + SSL_PREFIX + "localhost:" + kafkaProxyPortTls);
890892
proxyConfiguration.setBrokerWebServiceURL("http://localhost:" + getBrokerWebservicePort());
891893

892894
// Map Pulsar port to KOP port

tests/src/test/java/io/streamnative/pulsar/handlers/kop/docker/PulsarContainer.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,18 +140,21 @@ public void start() throws Exception {
140140

141141
// for KOP broker to broker communications via TLS
142142
pulsarContainer.withEnv("PULSAR_PREFIX_kopSslTruststoreLocation", "/pulsar/conf/ca.jks");
143-
pulsarContainer.withEnv("PULSAR_PREFIX_kopSslTruststorePassword", "");
143+
pulsarContainer.withEnv("PULSAR_PREFIX_kopSslTruststorePassword", "pulsar");
144144

145145
pulsarContainer.withEnv("PULSAR_PREFIX_brokerServiceURLTLS", "pulsar+ssl://pulsar:6651");
146146
pulsarContainer.withEnv("PULSAR_PREFIX_brokerWebServiceURLTLS", "https://pulsar:8443");
147147
pulsarContainer.withEnv("PULSAR_PREFIX_brokerServicePortTls", "6651");
148148
pulsarContainer.withEnv("PULSAR_PREFIX_webServicePortTls", "8443");
149149

150-
pulsarContainer.withEnv("PULSAR_PREFIX_tlsAllowInsecureConnection", "true");
151-
pulsarContainer.withEnv("PULSAR_PREFIX_tlsHostnameVerificationEnabled", "false");
150+
pulsarContainer.withEnv("PULSAR_PREFIX_tlsAllowInsecureConnection", "false");
151+
pulsarContainer.withEnv("PULSAR_PREFIX_tlsHostnameVerificationEnabled", "true");
152152

153153
pulsarContainer.withEnv("PULSAR_PREFIX_kopTlsEnabledWithBroker", "true");
154154
pulsarContainer.withEnv("PULSAR_PREFIX_tlsEnabledWithBroker", "true");
155+
pulsarContainer.withEnv("PULSAR_PREFIX_brokerClientTlsEnabledWithKeyStore", "true");
156+
pulsarContainer.withEnv("PULSAR_PREFIX_brokerClientTlsTrustStore", "/pulsar/conf/ca.jks");
157+
pulsarContainer.withEnv("PULSAR_PREFIX_brokerClientTlsTrustStorePassword", "pulsar");
155158
pulsarContainer.withEnv("PULSAR_PREFIX_brokerServiceURLTLS", "pulsar+ssl://pulsar:6651");
156159
pulsarContainer.withEnv("PULSAR_PREFIX_brokerWebServiceURLTLS", "https://pulsar:8443");
157160
}
@@ -246,10 +249,13 @@ public void start() throws Exception {
246249
// Proxy to broker communication
247250
proxyContainer.withEnv("PULSAR_PREFIX_kopTlsEnabledWithBroker", "true");
248251
proxyContainer.withEnv("PULSAR_PREFIX_tlsEnabledWithBroker", "true");
252+
proxyContainer.withEnv("PULSAR_PREFIX_brokerClientTlsEnabledWithKeyStore", "true");
253+
proxyContainer.withEnv("PULSAR_PREFIX_brokerClientTlsTrustStore", "/pulsar/conf/ca.jks");
254+
proxyContainer.withEnv("PULSAR_PREFIX_brokerClientTlsTrustStorePassword", "pulsar");
249255
proxyContainer.withEnv("PULSAR_PREFIX_kopSslTruststoreLocation", "/pulsar/conf/ca.jks");
250256
proxyContainer.withEnv("PULSAR_PREFIX_kopSslTruststorePassword", "pulsar");
251-
proxyContainer.withEnv("PULSAR_PREFIX_tlsAllowInsecureConnection", "true");
252-
proxyContainer.withEnv("PULSAR_PREFIX_tlsHostnameVerificationEnabled", "false");
257+
proxyContainer.withEnv("PULSAR_PREFIX_tlsAllowInsecureConnection", "false");
258+
proxyContainer.withEnv("PULSAR_PREFIX_tlsHostnameVerificationEnabled", "true");
253259
}
254260

255261
proxyContainer.start();
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 commit comments

Comments
 (0)