diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml
index 3f3ad40d2672b..d8e58cd863b52 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -39,6 +39,12 @@
pulsar-client-admin-original
${project.version}
+
+ ${project.groupId}
+ pulsar-client-messagecrypto-bc
+ ${project.parent.version}
+ true
+
@@ -58,6 +64,7 @@
org.apache.pulsar:pulsar-client-original
+ org.apache.pulsar:pulsar-client-api
org.apache.pulsar:pulsar-client-admin-original
org.apache.commons:commons-lang3
commons-codec:commons-codec
@@ -92,6 +99,8 @@
org.yaml:snakeyaml
io.swagger:*
org.apache.bookkeeper:bookkeeper-common-allocator
+
+ org.apache.pulsar:pulsar-client-messagecrypto-bc
diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml
index 3694574319bd9..236ee7469d16c 100644
--- a/pulsar-client-all/pom.xml
+++ b/pulsar-client-all/pom.xml
@@ -33,11 +33,6 @@
Pulsar Client All
-
- ${project.groupId}
- pulsar-client-api
- ${project.parent.version}
-
${project.groupId}
pulsar-client-original
@@ -81,14 +76,6 @@
**/ProtobufSchema.class
${project.build.directory}/classes
-
- ${project.groupId}
- pulsar-client-api
- ${project.version}
- jar
- true
- ${project.build.directory}/classes
-
@@ -113,6 +100,7 @@
org.apache.pulsar:pulsar-client-original
+ org.apache.pulsar:pulsar-client-api
org.apache.pulsar:pulsar-client-admin-original
org.apache.commons:commons-lang3
commons-codec:commons-codec
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index 27f48546673db..32eeccd2eb427 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -104,6 +104,7 @@
org.apache.pulsar:pulsar-client-original
+ org.apache.pulsar:pulsar-client-api
org.apache.bookkeeper:bookkeeper-common-allocator
org.apache.commons:commons-lang3
commons-codec:commons-codec
diff --git a/tests/pulsar-client-admin-shade-test/pom.xml b/tests/pulsar-client-admin-shade-test/pom.xml
index bb74588c4c50b..865f86bb43d3b 100644
--- a/tests/pulsar-client-admin-shade-test/pom.xml
+++ b/tests/pulsar-client-admin-shade-test/pom.xml
@@ -48,6 +48,13 @@
test
+
+ org.apache.pulsar
+ pulsar-client-messagecrypto-bc
+ ${project.version}
+ test
+
+
diff --git a/tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java b/tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
new file mode 100644
index 0000000000000..0c0efa3819319
--- /dev/null
+++ b/tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+ private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+ private PulsarContainer pulsarContainer;
+ private URI lookupUrl;
+ private PulsarClient pulsarClient;
+
+ @BeforeClass
+ public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException {
+ Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+
+ pulsarContainer = new PulsarContainer();
+ pulsarContainer.start();
+ pulsarClient = PulsarClient.builder()
+ .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+ .build();
+ lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+ PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+ admin.tenants().createTenant("my-property",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone")));
+ admin.namespaces().createNamespace("my-property/my-ns");
+ admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone"));
+ admin.close();
+ }
+
+ @AfterClass
+ public void cleanup() throws PulsarClientException {
+ pulsarClient.close();
+ pulsarContainer.stop();
+ pulsarContainer.close();
+ }
+
+ private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+ return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+ }
+
+ @Test
+ public void testRSAEncryption() throws Exception {
+
+ String topicName = "persistent://my-property/my-ns/myrsa-topic1-" + System.currentTimeMillis();
+
+ class EncKeyReader implements CryptoKeyReader {
+
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+ }
+
+ final int totalMsg = 10;
+
+ Set messageSet = Sets.newHashSet();
+ Consumer consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+ .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
+ Consumer normalConsumer = pulsarClient.newConsumer()
+ .topic(topicName).subscriptionName("my-subscriber-name-normal")
+ .subscribe();
+
+ Producer producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+ .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+ Producer producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+ .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+
+ for (int i = 0; i < totalMsg; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+ for (int i = totalMsg; i < totalMsg * 2; i++) {
+ String message = "my-message-" + i;
+ producer2.send(message.getBytes());
+ }
+
+ MessageImpl msg = null;
+
+ msg = (MessageImpl) normalConsumer.receive(500, TimeUnit.MILLISECONDS);
+ // should not able to read message using normal message.
+ assertNull(msg);
+
+ for (int i = 0; i < totalMsg * 2; i++) {
+ msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS);
+ // verify that encrypted message contains encryption-context
+ msg.getEncryptionCtx()
+ .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+ }
+
+ protected void testMessageOrderAndDuplicates(Set messagesReceived, T receivedMessage,
+ T expectedMessage) {
+ // Make sure that messages are received in order
+ Assert.assertEquals(receivedMessage, expectedMessage,
+ "Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
+
+ // Make sure that there are no duplicates
+ Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
+ }
+
+ @Test
+ public void testRedeliveryOfFailedMessages() throws Exception {
+
+ @Cleanup
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+ .build();
+
+ final String encryptionKeyName = "client-rsa.pem";
+ final String encryptionKeyVersion = "1.0";
+ Map metadata = Maps.newHashMap();
+ metadata.put("version", encryptionKeyVersion);
+ class EncKeyReader implements CryptoKeyReader {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+ }
+
+ class InvalidKeyReader implements CryptoKeyReader {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map keyMeta) {
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map metadata) {
+ return null;
+ }
+ }
+
+ /*
+ * Redelivery functionality guarantees that customer will get a chance to process the message again.
+ * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it.
+ *
+ * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers
+ * - few which can decrypt, few which can't (due to errors or cryptoReader not configured).
+ *
+ * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message.
+ *
+ * Consumer 1 - Can decrypt message
+ * Consumer 2 - Has invalid Reader configured.
+ * Consumer 3 - Has no reader configured.
+ *
+ */
+
+ String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+ Producer producer = pulsarClient.newProducer().topic(topicName)
+ .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+ .cryptoKeyReader(new EncKeyReader()).create();
+
+ PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+ Consumer consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName)
+ .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
+ .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+ PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+ Consumer consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName)
+ .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
+ .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+ PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+ Consumer consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName)
+ .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+ int numberOfMessages = 100;
+ String message = "my-message";
+ Set messages = new HashSet(); // Since messages are in random order
+ for (int i = 0; i < numberOfMessages; i++) {
+ producer.send((message + i).getBytes());
+ }
+
+ // Consuming from consumer 2 and 3
+ // no message should be returned since they can't decrypt the message
+ Message m = consumer2.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+ m = consumer3.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ // All messages would be received by consumer 1
+ m = consumer1.receive();
+ messages.add(new String(m.getData()));
+ consumer1.acknowledge(m);
+ }
+
+ // Consuming from consumer 2 and 3 again just to be sure
+ // no message should be returned since they can't decrypt the message
+ m = consumer2.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+ m = consumer3.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+
+ // checking if all messages were received
+ for (int i = 0; i < numberOfMessages; i++) {
+ assertTrue(messages.contains((message + i)));
+ }
+
+ consumer1.close();
+ consumer2.close();
+ consumer3.close();
+ newPulsarClient.close();
+ newPulsarClient1.close();
+ newPulsarClient2.close();
+ }
+
+ @Test
+ public void testEncryptionFailure() throws Exception {
+
+ class EncKeyReader implements CryptoKeyReader {
+
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ return keyInfo;
+ } catch (IOException e) {
+ log.error("Failed to read certificate from {}", CERT_FILE_PATH);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ return keyInfo;
+ } catch (IOException e) {
+ log.error("Failed to read certificate from {}", CERT_FILE_PATH);
+ }
+ }
+ return null;
+ }
+ }
+
+ final int totalMsg = 10;
+
+ MessageImpl msg = null;
+ Set messageSet = Sets.newHashSet();
+ Consumer consumer = pulsarClient.newConsumer()
+ .topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name")
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+ // 1. Invalid key name
+ try {
+ pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+ .addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+ Assert.fail("Producer creation should not suceed if failing to read key");
+ } catch (Exception e) {
+ // ok
+ }
+
+ // 2. Producer with valid key name
+ Producer producer = pulsarClient.newProducer()
+ .topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+ .addEncryptionKey("client-rsa.pem")
+ .cryptoKeyReader(new EncKeyReader())
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ for (int i = 0; i < totalMsg; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ // 3. KeyReder is not set by consumer
+ // Receive should fail since key reader is not setup
+ msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNull(msg, "Receive should have failed with no keyreader");
+
+ // 4. Set consumer config to consume even if decryption fails
+ consumer.close();
+ consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+ .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+ int msgNum = 0;
+ try {
+ // Receive should proceed and deliver encrypted message
+ msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ String expectedMessage = "my-message-" + msgNum++;
+ Assert.assertNotEquals(receivedMessage, expectedMessage, "Received encrypted message " + receivedMessage
+ + " should not match the expected message " + expectedMessage);
+ consumer.acknowledgeCumulative(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Failed to receive message even after ConsumerCryptoFailureAction.CONSUME is set.");
+ }
+
+ // 5. Set keyreader and failure action
+ consumer.close();
+ // Set keyreader
+ consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+ .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
+ .cryptoKeyReader(new EncKeyReader()).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+ for (int i = msgNum; i < totalMsg - 1; i++) {
+ msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS);
+ // verify that encrypted message contains encryption-context
+ msg.getEncryptionCtx()
+ .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+
+ // 6. Set consumer config to discard if decryption fails
+ consumer.close();
+ consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+ .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD)
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+ // Receive should proceed and discard encrypted messages
+ msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNull(msg, "Message received even aftet ConsumerCryptoFailureAction.DISCARD is set.");
+ }
+
+ @Test
+ public void testEncryptionConsumerWithoutCryptoReader() throws Exception {
+
+ final String encryptionKeyName = "client-rsa.pem";
+ final String encryptionKeyVersion = "1.0";
+ Map metadata = Maps.newHashMap();
+ metadata.put("version", encryptionKeyVersion);
+ class EncKeyReader implements CryptoKeyReader {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+ }
+
+ Producer producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic3")
+ .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+ .cryptoKeyReader(new EncKeyReader()).create();
+
+ Consumer consumer = pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic3")
+ .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+ .subscribe();
+
+ String message = "my-message";
+ producer.send(message.getBytes());
+
+ TopicMessageImpl msg = (TopicMessageImpl) consumer.receive(5, TimeUnit.SECONDS);
+
+ String receivedMessage = decryptMessage(msg, encryptionKeyName, new EncKeyReader());
+ assertEquals(message, receivedMessage);
+
+ consumer.close();
+ }
+
+ private String decryptMessage(TopicMessageImpl msg, String encryptionKeyName, CryptoKeyReader reader)
+ throws Exception {
+ Optional ctx = msg.getEncryptionCtx();
+ Assert.assertTrue(ctx.isPresent());
+ EncryptionContext encryptionCtx = ctx
+ .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+
+ Map keys = encryptionCtx.getKeys();
+ assertEquals(keys.size(), 1);
+ EncryptionContext.EncryptionKey encryptionKey = keys.get(encryptionKeyName);
+ byte[] dataKey = encryptionKey.getKeyValue();
+ Map metadata = encryptionKey.getMetadata();
+ String version = metadata.get("version");
+ assertEquals(version, "1.0");
+
+ CompressionType compressionType = encryptionCtx.getCompressionType();
+ int uncompressedSize = encryptionCtx.getUncompressedMessageSize();
+ byte[] encrParam = encryptionCtx.getParam();
+ String encAlgo = encryptionCtx.getAlgorithm();
+ int batchSize = encryptionCtx.getBatchSize().orElse(0);
+
+ ByteBuf payloadBuf = Unpooled.wrappedBuffer(msg.getData());
+ // try to decrypt use default MessageCryptoBc
+ MessageCrypto crypto = new MessageCryptoBc("test", false);
+ PulsarApi.MessageMetadata.Builder metadataBuilder = PulsarApi.MessageMetadata.newBuilder();
+ PulsarApi.EncryptionKeys.Builder encKeyBuilder = PulsarApi.EncryptionKeys.newBuilder();
+ encKeyBuilder.setKey(encryptionKeyName);
+ ByteString keyValue = ByteString.copyFrom(dataKey);
+ encKeyBuilder.setValue(keyValue);
+ PulsarApi.EncryptionKeys encKey = encKeyBuilder.build();
+ metadataBuilder.setEncryptionParam(ByteString.copyFrom(encrParam));
+ metadataBuilder.setEncryptionAlgo(encAlgo);
+ metadataBuilder.setProducerName("test");
+ metadataBuilder.setSequenceId(123);
+ metadataBuilder.setPublishTime(12333453454L);
+ metadataBuilder.addEncryptionKeys(encKey);
+ metadataBuilder.setCompression(CompressionCodecProvider.convertToWireProtocol(compressionType));
+ metadataBuilder.setUncompressedSize(uncompressedSize);
+ ByteBuf decryptedPayload = crypto.decrypt(() -> metadataBuilder.build(), payloadBuf, reader);
+
+ // try to uncompress
+ CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
+ ByteBuf uncompressedPayload = codec.decode(decryptedPayload, uncompressedSize);
+
+ if (batchSize > 0) {
+ PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
+ .newBuilder();
+ uncompressedPayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
+ singleMessageMetadataBuilder, 0, batchSize);
+ }
+
+ byte[] data = new byte[uncompressedPayload.readableBytes()];
+ uncompressedPayload.readBytes(data);
+ uncompressedPayload.release();
+ return new String(data);
+ }
+
+}
diff --git a/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/client.crt b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/client.crt
new file mode 100644
index 0000000000000..2d7d156866a86
--- /dev/null
+++ b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/client.crt
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDVjCCAj4CCQCtw/UnTFDT7DANBgkqhkiG9w0BAQUFADBtMQswCQYDVQQGEwJB
+VTETMBEGA1UECAwKU29tZS1TdGF0ZTEVMBMGA1UEBwwMRGVmYXVsdCBDaXR5MSEw
+HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBmNsaWVu
+dDAeFw0xNjA2MjAwMTQ1NDZaFw0yNjA2MTgwMTQ1NDZaMG0xCzAJBgNVBAYTAkFV
+MRMwEQYDVQQIDApTb21lLVN0YXRlMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxITAf
+BgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UEAwwGY2xpZW50
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqQV5F3Au9FWXIYPdWqiX
+Rk5gdVmVkDuuFK4ZoOd8inoJpB3PPkpmpgoVkKQHDFhgx3ODGWIUgo+n6QDsJxY4
+ygHfVeggQgek8iUfteYVsIcHS0bjkhIij/3ihC301FkiqbrV069oLvUXLKcv3zxG
+mdBAiz0k4xGZhFieVRvQCLY9syUUxmQ/3Cv42lDY8a1gTw4CRRx/hCfDvXCKhOT4
+bMwUIDZfHB3JoDh3Thp8FLz0nTrRF75mSQJ/OdcafIm0Xoz2Otp/CSxLS+U1lLvG
+05crWTDe0om7NW4mK4CqGCFq5gUw7eIzaeO7Q5Qez9XGTMzkgIDTMvNYGGEeJhhm
+NQIDAQABMA0GCSqGSIb3DQEBBQUAA4IBAQAKXy4g6hljY5MpO8mbZh+uJHq6NEUs
+4dr7OKDDWc39AROZsGf2eFUmHOjmRSw7VHpguGKI+rFRELVffpg/VvMh5apu+DBf
+jhxtDNceAyh5uugPNUJHXyeikBDYW8bAzUU3DmMldPkTZWcGjurmyhDQ1TtK2YJe
+RMFBXw5aAzdJMNi6OfXDH/ZX32hrb482yghDZj+ndnm0FefmLbFTQRMF8/fIHb1W
+kqNHwIaapZwH6j/MJy/TRFYcJunrBUYT9zVjY46k3GU0ex/Bn7T4pg9gzgFGZJhn
+jQQFKliIC84thCzdlPkrLduLY8tmlDKpLXatbEQ+s1MmNOURm6irPp6g
+-----END CERTIFICATE-----
diff --git a/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/client.csr b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/client.csr
new file mode 100644
index 0000000000000..e01f33ef073f6
--- /dev/null
+++ b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/client.csr
@@ -0,0 +1,17 @@
+-----BEGIN CERTIFICATE REQUEST-----
+MIICsjCCAZoCAQAwbTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx
+FTATBgNVBAcMDERlZmF1bHQgQ2l0eTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0
+cyBQdHkgTHRkMQ8wDQYDVQQDDAZjbGllbnQwggEiMA0GCSqGSIb3DQEBAQUAA4IB
+DwAwggEKAoIBAQCpBXkXcC70VZchg91aqJdGTmB1WZWQO64Urhmg53yKegmkHc8+
+SmamChWQpAcMWGDHc4MZYhSCj6fpAOwnFjjKAd9V6CBCB6TyJR+15hWwhwdLRuOS
+EiKP/eKELfTUWSKputXTr2gu9Rcspy/fPEaZ0ECLPSTjEZmEWJ5VG9AItj2zJRTG
+ZD/cK/jaUNjxrWBPDgJFHH+EJ8O9cIqE5PhszBQgNl8cHcmgOHdOGnwUvPSdOtEX
+vmZJAn851xp8ibRejPY62n8JLEtL5TWUu8bTlytZMN7Sibs1biYrgKoYIWrmBTDt
+4jNp47tDlB7P1cZMzOSAgNMy81gYYR4mGGY1AgMBAAGgADANBgkqhkiG9w0BAQUF
+AAOCAQEAk3eueaq/gonBzKH75oWHlqPbMZQFk4NXqx8h24ZfkCzPEFPyDM+jdQxv
+8vDtyWq+fizqAQmGrM7WPHgnTbmZyovfmwuKwtTlkD/8t7XpTmm9fYspbL4WzdP1
+y8/Vug09te+rni+v+kjk5b9IceEy6kLvXuzirE6c4LunAm+thrr5gWmsx1pyDiq7
+W2M15UZrm/paaCg6cVaMFdXCRZP+g1P4NcgDUe2TyFbLlhOJNtX3DJRZWEhrkEYK
+mRz2tJuiuitCzheAgRrFXepRagHKYffNSas1n/2kIc9QpZ8654kxsAzEwL7CnHd/
+SHbMS9dfP+uM6DACwcvngSOBMJ9KMg==
+-----END CERTIFICATE REQUEST-----
diff --git a/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/client.key b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/client.key
new file mode 100644
index 0000000000000..34fc701c5257d
--- /dev/null
+++ b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/client.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCpBXkXcC70VZch
+g91aqJdGTmB1WZWQO64Urhmg53yKegmkHc8+SmamChWQpAcMWGDHc4MZYhSCj6fp
+AOwnFjjKAd9V6CBCB6TyJR+15hWwhwdLRuOSEiKP/eKELfTUWSKputXTr2gu9Rcs
+py/fPEaZ0ECLPSTjEZmEWJ5VG9AItj2zJRTGZD/cK/jaUNjxrWBPDgJFHH+EJ8O9
+cIqE5PhszBQgNl8cHcmgOHdOGnwUvPSdOtEXvmZJAn851xp8ibRejPY62n8JLEtL
+5TWUu8bTlytZMN7Sibs1biYrgKoYIWrmBTDt4jNp47tDlB7P1cZMzOSAgNMy81gY
+YR4mGGY1AgMBAAECggEAcJj3yVhvv0/BhY8+CCYl2K1f7u1GCLbpSleNNTbhLbMM
+9yrwo/OWnGg9Y4USOPQrTNOz81X2id+/oSZ/K67PGCvVJ3qi+rny9WkrzdbAfkAF
+6O0Jr4arRbeBjkK7Rjc3M1EHH6VLx3R5AsNBzfpuogss5FVQXICd/5+1oscLeLEx
+/Fn+51IEn9FUg5vr7ElG51f+zPxexcWHLNoqGjTEIGGtI8/CfTzD9tBV4sIjf/Nc
+Zzfs9XYrChfcrS0U1zDa+L7c5gYfoN6M08sBiuZlhyyO9wgzPlp+XnsrSFv6hUta
+0scjAbN4bh+orQn6zgFN/sjkQnraWXW7pKFLyTR/IQKBgQDVju4IbhE9XRweNgXi
+s3BuGV+HsuFffEf0904/zCuCUcScGb5WCz5+KtlFJ//YxfocHVZajH+4GdCGbWim
+m+H3XvRpWgfK/aBNOXu5ueLbnPYyPjTrcpKRsomeoiV+Jz1tv5PQElwzCiCzVvQf
+fMyhQT16YIsFQAGJzQMBEHWODQKBgQDKnKps3sKSR3ycUtIxCVXUir7p52qst0Pm
+bPO8JrcRKZP2z8MJB96+DcQFzrxj7t5DDktkYEsFOPPuIeUsYXsY+MKHs4hEQVCz
+hpDJJNQ8s+SV8TLzKpinZEmLIjslLbn2rQrpqybPg84VxqX3qqM8IrXhMf77aGj6
+QHqvQwHWyQKBgQDF1RVO+9++j82ncvY6z22coKath5leIjxqgtqbISFBJUxUK0j2
+Xo4yxLDnbqmE/8m1V7wSP8tlGYzhquLiTM+kn/Mc0Ukc0503TMQABmJQfXRYkOXn
+IwkCLXltWdoPpnwyeeGNRCTjJ0OpvyiBLtRFobE498xxPZzvMdrRlpS/1QKBgQCo
+wmMleUnBQ2/kWQugMnFeLg6kjs+IesFAnYFKN0kGL4aB7j06OWbrEFY0rCS4bA6O
+9coQGjCCchSjRXI4TB2XCCQnmX8nsuuADNZt45Iv2XrM9XEFn3Y0/tBO5j0zU2nw
+r+NGC/uwns050BMPPf7mqNarctQ6HZZK0wgdEQfoGQKBgC+pbkQv9cn68TsiaJ3w
+tvNRTXCIAAH4Vtn9Cp+63ao+kXn94BJqQF99i58kJpG4ol6wbCHUoC6fHgxUh5HB
+JB0HjC2eCMgn4acAQg0sPW6l35KX36yYxtrL7eosB/yBYum0XAwmboNjEhlCZkOs
+YOpSsn61g7xqqrt40Spb5vUn
+-----END PRIVATE KEY-----
diff --git a/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/private-key.client-ecdsa.pem b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/private-key.client-ecdsa.pem
new file mode 100644
index 0000000000000..58ab3d4ff9be8
--- /dev/null
+++ b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/private-key.client-ecdsa.pem
@@ -0,0 +1,13 @@
+-----BEGIN EC PARAMETERS-----
+MIGXAgEBMBwGByqGSM49AQECEQD////9////////////////MDsEEP////3/////
+//////////wEEOh1ecEQefQ92CSZPCzuXtMDFQAADg1NaW5naHVhUXUMwDpEc9A2
+eQQhBBYf91KLiZstDChgfKUsW4bPWsg5W6/rE8AtopLd7XqDAhEA/////gAAAAB1
+ow0bkDihFQIBAQ==
+-----END EC PARAMETERS-----
+-----BEGIN EC PRIVATE KEY-----
+MIHYAgEBBBDeu9hc8kOvL3pl+LYSjLq9oIGaMIGXAgEBMBwGByqGSM49AQECEQD/
+///9////////////////MDsEEP////3///////////////wEEOh1ecEQefQ92CSZ
+PCzuXtMDFQAADg1NaW5naHVhUXUMwDpEc9A2eQQhBBYf91KLiZstDChgfKUsW4bP
+Wsg5W6/rE8AtopLd7XqDAhEA/////gAAAAB1ow0bkDihFQIBAaEkAyIABOsqPpE8
+cY80pxkog5xw3i2AQ0yfV3MqMusxlOQnigBp
+-----END EC PRIVATE KEY-----
diff --git a/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/private-key.client-mismatch-rsa.pem b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/private-key.client-mismatch-rsa.pem
new file mode 100644
index 0000000000000..3e2831ae8a398
--- /dev/null
+++ b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/private-key.client-mismatch-rsa.pem
@@ -0,0 +1,29 @@
+-----BEGIN EC PARAMETERS-----
+MIIBwgIBATBNBgcqhkjOPQEBAkIB////////////////////////////////////
+//////////////////////////////////////////////////8wgZ4EQgH/////
+////////////////////////////////////////////////////////////////
+/////////////////ARBUZU+uWGOHJofkpohoLaFQO6i2nJbmbMV87i0iZGO8Qnh
+Vhk5Uex+k3sWUsC9O7G/BzVz34g9LDTx70Uf1GtQPwADFQDQnogAKRy4U5bMZxc5
+MoSqoNpkugSBhQQAxoWOBrcEBOnNnj7LZiOVtEKcZIE5BT+1Ifgor2BrTT26oUte
+d+/nWSj+HcEnov+o3jNIs8GFakKb+X5+McLlvWYBGDkpaniaO8AEXIpftCx9G9mY
+9URJV5tEaBevvRcnPmYsl+5ymV70JkDFULkBP60HYTU8cIaicsJAiL6Udp/RZlAC
+QgH///////////////////////////////////////////pRhoeDvy+Wa3/MAUj3
+CaXQO7XJuImcR667b7cekThkCQIBAQ==
+-----END EC PARAMETERS-----
+-----BEGIN EC PRIVATE KEY-----
+MIICnQIBAQRCAeNLEp1HefZ1nMl5vvgFMsJCd5ieCWqPT7TXbQkn27A8WkyAGTYC
+GtolyPokOgSjbJh+ofBt/MgvE/nMrqzmkZVtoIIBxjCCAcICAQEwTQYHKoZIzj0B
+AQJCAf//////////////////////////////////////////////////////////
+////////////////////////////MIGeBEIB////////////////////////////
+//////////////////////////////////////////////////////////wEQVGV
+PrlhjhyaH5KaIaC2hUDuotpyW5mzFfO4tImRjvEJ4VYZOVHsfpN7FlLAvTuxvwc1
+c9+IPSw08e9FH9RrUD8AAxUA0J6IACkcuFOWzGcXOTKEqqDaZLoEgYUEAMaFjga3
+BATpzZ4+y2YjlbRCnGSBOQU/tSH4KK9ga009uqFLXnfv51ko/h3BJ6L/qN4zSLPB
+hWpCm/l+fjHC5b1mARg5KWp4mjvABFyKX7QsfRvZmPVESVebRGgXr70XJz5mLJfu
+cple9CZAxVC5AT+tB2E1PHCGonLCQIi+lHaf0WZQAkIB////////////////////
+///////////////////////6UYaHg78vlmt/zAFI9wml0Du1ybiJnEeuu2+3HpE4
+ZAkCAQGhgYkDgYYABAFhUHeaHfIWre/pPmv2a2l891co79dFpg6ixPRg+Y5qe0C7
+src//LT/ZR5rgj8ne+YcaIlwyQRl5OYEd25n799IcgHIBTGyaLB6Td5mW/oWT/Fz
+soufOnUJ7O/kDHjIQ15sczk3rDhe8/mB9zPjKlKTuAl5jBEt6E3yiB44Dtng02xD
+uQ==
+-----END EC PRIVATE KEY-----
diff --git a/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/private-key.client-rsa.pem b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/private-key.client-rsa.pem
new file mode 100644
index 0000000000000..a0d589e0e2a4c
--- /dev/null
+++ b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/private-key.client-rsa.pem
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAtKWwgqdnTYrOCv+j1MkTWfSH0wCsHZZca9wAW3qP4uuhlBvn
+b10JcFf5ZjzP9BSXK+tHmI8uoN368vEv6yhURHM4yuXqzCxzuAwkQSo39rzX8PGC
+7qdjCN7LDJ3MnqiBIrUsSaEP1wrNsB1kI+o9ER1e5O/uEPAotP933hHQ0J2hMEek
+HqL7sBlJ98h6NmsicEaUkardk0TOXrlkjC+cMd8ZbGScPqI9M38bmn3OLxFTn1vt
+hpvnXLvCmG4M+6xtYtD+npcVPZw1i1R90fMs7ppZnRbv8Hc/DFdOKVQIgam6CDdn
+NKgW7c7IBMrP0AEm37HTu0LSOjP2OHXlvvlQGQIDAQABAoIBAAaJFAi2C7u3cNrf
+AstY9vVDLoLIvHFZlkBktjKZDYmVIsRb+hSCViwVUrWLL67R6+Iv4eg4DeTOAx00
+8pncXKgZTw2wIb1/QjR/Y/RjlaC8lkdmRWli7udMQCZVsyhuSjW6Pj7vr8YE4woj
+FhNijxEGcf9wWrmMJrzdnTWQiXByo+eTvUQ9BPgPGrRjsMZmTkLyAVJff2DfxO5b
+IWFDYDJcyYAMCIMQu7vys/I50ou6ilb1CO6QM6Z7KpPeOoVFPwtzbh8cf9xM8UNS
+j6J/JmdWhgI34GS3NA68xTQ6PV7zjnhCc+iccm3JKyzGXwaApAZ+Eoce/9j4WKmu
+5B4ziR0CgYEA3l/9OHbl1zmyV+rRxWOIj/i2rTvHzwBnbnPJyuemL5VMFdpGodQ3
+vwHvyQmcECRVRxmXojQ4QuPPHs3qp6wEEFPCWxChLSTxlUc85SOFHWU2O99jV7zI
+7+JOpDK/Mstsx9nHgXduJF+glTFtA3LH8Oqylzu2aFPsprwKuZf94Q8CgYEAz/Zx
+akEG+PEMtP5YS28cX5XfjsIX/V26Fs6/sH16QjUIEddE5T4fCuokxCjSiwUcWhml
+pHEJ5S5xp3VYRfISW3jRW3qstIH1tpZipB6+S0zTuJmLJbA3IiWEg2rtMt7X1uJv
+A/bYOqe0hOPTuXuZdtVZ0nMTKk7GG8O6VkBI7FcCgYEAkDfCmscJgs7JahlBWHmX
+zH9pwem+SPKjIc/4NB6N+dgikx2Pp05hpP/VihUwYIufvs/LNogVYNQrtHepUnrN
+2+TmbHbZgNSv1Ldxt82UfB7y0FutKu6lhmXHyNecho3Fi8sih0V0aiSWmYuHfrAH
+GaiskEZKo1iiZvQXJIx9O2MCgYATBf0r9hTYMtyxtc6H3/sdd01C9thQ8gDy0yjP
+0Tqc0dMSJroDqmIWkoKYew9/bhFA4LW5TCnWkCAPbHmNtG4fdfbYwmkH/hdnA2y0
+jKdlpfp8GXeUFAGHGx17FA3sqFvgKUh0eWEgRHUL7vdQMVFBgJS93o7zQM94fLgP
+6cOB8wKBgFcGV4GjI2Ww9cillaC554MvoSjf8B/+04kXzDOh8iYIIzO9EUil1jjK
+Jvxp4hnLzTKWbux3MEWqurLkYas6GpKBjw+iNOCar6YdqWGVqM3RUx7PTUaZwkKx
+UdP63IfY7iZCIT/QbyHQvIUe2MaiVnH+ulxdkK6Y5e7gxcbckIH4
+-----END RSA PRIVATE KEY-----
diff --git a/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/public-key.client-ecdsa.pem b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/public-key.client-ecdsa.pem
new file mode 100644
index 0000000000000..5aeb429a3d695
--- /dev/null
+++ b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/public-key.client-ecdsa.pem
@@ -0,0 +1,7 @@
+-----BEGIN PUBLIC KEY-----
+MIHKMIGjBgcqhkjOPQIBMIGXAgEBMBwGByqGSM49AQECEQD////9////////////
+////MDsEEP////3///////////////wEEOh1ecEQefQ92CSZPCzuXtMDFQAADg1N
+aW5naHVhUXUMwDpEc9A2eQQhBBYf91KLiZstDChgfKUsW4bPWsg5W6/rE8AtopLd
+7XqDAhEA/////gAAAAB1ow0bkDihFQIBAQMiAATrKj6RPHGPNKcZKIOccN4tgENM
+n1dzKjLrMZTkJ4oAaQ==
+-----END PUBLIC KEY-----
diff --git a/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/public-key.client-mismatch-rsa.pem b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/public-key.client-mismatch-rsa.pem
new file mode 100644
index 0000000000000..6fc427b1b19dc
--- /dev/null
+++ b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/public-key.client-mismatch-rsa.pem
@@ -0,0 +1,9 @@
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAtKWwgqdnTYrOCv+j1MkT
+WfSH0wCsHZZca9wAW3qP4uuhlBvnb10JcFf5ZjzP9BSXK+tHmI8uoN368vEv6yhU
+RHM4yuXqzCxzuAwkQSo39rzX8PGC7qdjCN7LDJ3MnqiBIrUsSaEP1wrNsB1kI+o9
+ER1e5O/uEPAotP933hHQ0J2hMEekHqL7sBlJ98h6NmsicEaUkardk0TOXrlkjC+c
+Md8ZbGScPqI9M38bmn3OLxFTn1vthpvnXLvCmG4M+6xtYtD+npcVPZw1i1R90fMs
+7ppZnRbv8Hc/DFdOKVQIgam6CDdnNKgW7c7IBMrP0AEm37HTu0LSOjP2OHXlvvlQ
+GQIDAQAB
+-----END PUBLIC KEY-----
diff --git a/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/public-key.client-rsa.pem b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/public-key.client-rsa.pem
new file mode 100644
index 0000000000000..6fc427b1b19dc
--- /dev/null
+++ b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/public-key.client-rsa.pem
@@ -0,0 +1,9 @@
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAtKWwgqdnTYrOCv+j1MkT
+WfSH0wCsHZZca9wAW3qP4uuhlBvnb10JcFf5ZjzP9BSXK+tHmI8uoN368vEv6yhU
+RHM4yuXqzCxzuAwkQSo39rzX8PGC7qdjCN7LDJ3MnqiBIrUsSaEP1wrNsB1kI+o9
+ER1e5O/uEPAotP933hHQ0J2hMEekHqL7sBlJ98h6NmsicEaUkardk0TOXrlkjC+c
+Md8ZbGScPqI9M38bmn3OLxFTn1vthpvnXLvCmG4M+6xtYtD+npcVPZw1i1R90fMs
+7ppZnRbv8Hc/DFdOKVQIgam6CDdnNKgW7c7IBMrP0AEm37HTu0LSOjP2OHXlvvlQ
+GQIDAQAB
+-----END PUBLIC KEY-----
diff --git a/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/server.crt b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/server.crt
new file mode 100644
index 0000000000000..59b651be2a406
--- /dev/null
+++ b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/server.crt
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDLjCCAhYCCQDn/Yvym+FMsDANBgkqhkiG9w0BAQUFADBZMQswCQYDVQQGEwJB
+VTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0
+cyBQdHkgTHRkMRIwEAYDVQQDEwlsb2NhbGhvc3QwHhcNMTYwNjEzMjIyMTQ2WhcN
+MjYwNjExMjIyMTQ2WjBZMQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0
+ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMRIwEAYDVQQDEwls
+b2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCs29IuzZvk
+OGUkS/wqKzd/h2esqjCSjw4SLLbeh1GA3UEvh1k9+eRiYwJG1yCOHmcsp4A8Du99
+8xbgeihpWWw7pjL5VVky3ciuvHyz1Cc6bKRps/GzVJBwFP0gzHnK8bUM86U52yGT
+1DepD/Y2lURy0igdVcAMjGweMwoTmiaVcwZexfYuEef+jz3fmpmOwP9rboIA9rQr
+mTbLzzkbAwZXdl+bRvIefIjIazIzTOs8tJWrhFaTJUgBhhLjFIwTdpS+n+FqOu8J
+92K+PvKjIeJ3kmnZyRHK7uidlAn0g/DK+co1sX3zORPCWeg21K+/vVHTj91zARNb
+O9hVS4bqqsw9AgMBAAEwDQYJKoZIhvcNAQEFBQADggEBACE0WBuTbHcPtYKv2ZMS
+mYk9jvtAhmWHQ6tNqV8CmS2AsrzZdWglGaqIRsm5slkD2BGeQS+BesTArUuENTmP
+r9kJSecdiiB8aWtLbhoCSH3QR6IW/b5UVl6sR5OIh7SkNTjMSUSDnMEVLNGyKZGS
+gCGVbDf3n5KhOTnwqguELRykynKFt2LVksBia9+88lUtiRHpbyClo/KVWltJlaww
+PT0WEpwqVUcHmwrR3MTzJDEPvIplSgxdaDmFGYS1YKm9T/wQd+t/0DbXMmfJXBbd
+FVUnB6o7qJVU9N2Tbaj9NbCtwz5nTZG4A5kRXWHVjZsn5WzLuS/me3rDXjwlfB2p
+ipY=
+-----END CERTIFICATE-----
diff --git a/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/server.csr b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/server.csr
new file mode 100644
index 0000000000000..8782222c5ab46
--- /dev/null
+++ b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/server.csr
@@ -0,0 +1,17 @@
+-----BEGIN CERTIFICATE REQUEST-----
+MIICnjCCAYYCAQAwWTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUtU3RhdGUx
+ITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAxMJbG9j
+YWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArNvSLs2b5Dhl
+JEv8Kis3f4dnrKowko8OEiy23odRgN1BL4dZPfnkYmMCRtcgjh5nLKeAPA7vffMW
+4HooaVlsO6Yy+VVZMt3Irrx8s9QnOmykabPxs1SQcBT9IMx5yvG1DPOlOdshk9Q3
+qQ/2NpVEctIoHVXADIxsHjMKE5omlXMGXsX2LhHn/o8935qZjsD/a26CAPa0K5k2
+y885GwMGV3Zfm0byHnyIyGsyM0zrPLSVq4RWkyVIAYYS4xSME3aUvp/hajrvCfdi
+vj7yoyHid5Jp2ckRyu7onZQJ9IPwyvnKNbF98zkTwlnoNtSvv71R04/dcwETWzvY
+VUuG6qrMPQIDAQABoAAwDQYJKoZIhvcNAQEFBQADggEBAEPHySnpf3E/7tZsiDka
+rqdB/sU7fdqjyV0iy0cuKQkU8WYrsE7bHkqMYc8CiIDfWhIGW5Jnzups2O6eH0Sx
+2BS21ARFiNGC1UfY1HSV2zrTNh3RqQa3YsXzv9vvdQ/gjsqGDuGDIc1yAA+Ytdja
+3rhIzEVqBhiLzg+M2+gW1zs+Kqj0Zo0pLB2uqhdZJmjxBb2FCli50vCVEhqIS3RO
+KTE+AJfxThWIeahFyVaskaEGkS6NVr2JihV0elbKolH19k2UzRTVn7p3Ixh5ojuW
+gtU/90vOy/SDkSRmCWMqgkUKJ2oeImleHdrvwNyrzvrLWRAz6R5yGQJwji9kKpHD
+FK0=
+-----END CERTIFICATE REQUEST-----
diff --git a/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/server.key b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/server.key
new file mode 100644
index 0000000000000..6da70f5aec3b5
--- /dev/null
+++ b/tests/pulsar-client-admin-shade-test/src/test/resources/certificate/server.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCs29IuzZvkOGUk
+S/wqKzd/h2esqjCSjw4SLLbeh1GA3UEvh1k9+eRiYwJG1yCOHmcsp4A8Du998xbg
+eihpWWw7pjL5VVky3ciuvHyz1Cc6bKRps/GzVJBwFP0gzHnK8bUM86U52yGT1Dep
+D/Y2lURy0igdVcAMjGweMwoTmiaVcwZexfYuEef+jz3fmpmOwP9rboIA9rQrmTbL
+zzkbAwZXdl+bRvIefIjIazIzTOs8tJWrhFaTJUgBhhLjFIwTdpS+n+FqOu8J92K+
+PvKjIeJ3kmnZyRHK7uidlAn0g/DK+co1sX3zORPCWeg21K+/vVHTj91zARNbO9hV
+S4bqqsw9AgMBAAECggEAd/LuDeZFZ/+uR5qmuAhXMZqfWZSbsges5vW6S/6wkvB1
+vGp6heQzFAbKXKgJgjUcuULeXE6s58RYuppqEnin/1hcBOKxy/dUu9Q14H+2XPdo
+u6TPcvaaZ/xYjnr1hNtnHD6yB8zEpxVbLmjSHJxF7Dti9MA9TTfgCrC2LFYKsicD
+/5AQyHuwpHyTL3Iiwv4Qtks/SD2a3fu8lD0yTQwA/hY6/0ieXxXd9tZV5a6GSA0P
+nieol1byfuX7Q5fb8ggPd9u9K1mVZTBRKiE5w+uU4Ic2IkBmZX5ZuRS+vFplpLsY
+YpFPvzFmpNkpK2SdYjJ+V4tkJsFHmOaFRgW/0QB2DQKBgQDeQMSZBQlPUrgRdWHN
+OyvTcrSvXzg5DbaIj39tgdNZ6PYns/thD0n707KGRJOChIyYiiKxLxzLWdPUxqQO
+rNLUV9IkMVc/QZR8RUqGc2BxmPOxAprhzeOhLsyqP/sgtxRHAnLqmkXuHYoxvTZ6
+LFCRCZBpEJrutGxl3s/x+sfkuwKBgQDHGwnSmvArpL8ZY1dV4xKNkxifCBnNmqAl
+TKHPW3odN9nkMECEt1XUIioUUKXUsiAZNp5xa/v1DEyJ4f2T20QKcAGbS18b1M5W
+axIoH3IhyLo74tuo0fthgq5bzypfFOlIjo7F9mpEky/461RWmoNAAlp9+FkDi48C
+KwjAk39/ZwKBgQDXFJqs8sDFsOlMi+nvsHmDERhmNqG0JN8mXKgWk3KzKc09MuHs
+Vd1lBMNZSHfv8NIWtGdKTKty5yUmXm1ZfkoxECPevpkOMCq/8FZksrb8d+YswLae
+Gp9U1nNdtrkSOdo3tdj7y/wsqQ2ZgOB9bvEwyq6j3lvw8U2NcAiQxf44DQKBgBHb
+lPf0uZHQhutKA61KXoGgLdclrNrKAY8W3nRwqfUw6zQSN9cvcl1Cay/DQ/xdtY9N
+XMyjeMezwLGlOU8nnWSqQxqgmfkvDwqlM82xdFUfYcS5RiZQHxHR3L2TSSOaBoph
+buDGhyV7ZhQXV0slNJxrGZ6uxZ0RyVPSdEiBcjAFAoGBAJqZ6uCVHpv/FwZVggu7
+Xb9EIxZnLSmXwaXFpJoMZpRpKb8cSTTJbgSMv3Dq2LcNKYXdNBhgKgPSc/XipXt9
+ZdT36KWipV+PzW691kUiWHtA8/+E0LCi4Y7rlcBMz9PgDNXK4XMMZOVKxDqPcHSJ
+P6y01ku7T2X+abUiJ334Hg6G
+-----END PRIVATE KEY-----
diff --git a/tests/pulsar-client-admin-shade-test/src/test/resources/pulsar.xml b/tests/pulsar-client-admin-shade-test/src/test/resources/pulsar.xml
index cdb099d034121..e32e9b464a15a 100644
--- a/tests/pulsar-client-admin-shade-test/src/test/resources/pulsar.xml
+++ b/tests/pulsar-client-admin-shade-test/src/test/resources/pulsar.xml
@@ -25,6 +25,7 @@
+
diff --git a/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java b/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
index 72453853d54e8..dfcbbc563b894 100644
--- a/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
+++ b/tests/pulsar-client-all-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
@@ -94,7 +94,7 @@ private PulsarClient newPulsarClient(String url, int intervalInSecs) throws Puls
@Test
public void testRSAEncryption() throws Exception {
- String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis();
+ String topicName = "persistent://my-property/my-ns/myrsa-topic1-" + System.currentTimeMillis();
class EncKeyReader implements CryptoKeyReader {
@@ -289,7 +289,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map metad
int numberOfMessages = 100;
String message = "my-message";
Set messages = new HashSet(); // Since messages are in random order
- for (int i = 0; i metad
m = consumer3.receive(3, TimeUnit.SECONDS);
assertNull(m);
- for (int i = 0; i metad
assertNull(m);
// checking if all messages were received
- for (int i = 0; i keyMe
msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(msg, "Message received even aftet ConsumerCryptoFailureAction.DISCARD is set.");
}
+
@Test(groups = "encryption")
public void testEncryptionConsumerWithoutCryptoReader() throws Exception {
diff --git a/tests/pulsar-client-shade-test/pom.xml b/tests/pulsar-client-shade-test/pom.xml
index 5917fc54eebbb..77b1b3ae96d55 100644
--- a/tests/pulsar-client-shade-test/pom.xml
+++ b/tests/pulsar-client-shade-test/pom.xml
@@ -43,6 +43,13 @@
test
+
+ org.apache.pulsar
+ pulsar-client-admin
+ ${project.version}
+ test
+
+
org.testcontainers
testcontainers
diff --git a/tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java b/tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
new file mode 100644
index 0000000000000..1b91094025264
--- /dev/null
+++ b/tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.shade.com.google.common.collect.Maps;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.*;
+
+public class SimpleProducerConsumerTest {
+ private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
+
+ private PulsarContainer pulsarContainer;
+ private URI lookupUrl;
+ private PulsarClient pulsarClient;
+
+ @BeforeClass
+ public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException {
+ Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+ pulsarContainer = new PulsarContainer();
+ pulsarContainer.start();
+ pulsarClient = PulsarClient.builder()
+ .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+ .build();
+ lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl());
+
+ PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build();
+ admin.tenants().createTenant("my-property",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone")));
+ admin.namespaces().createNamespace("my-property/my-ns");
+ admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone"));
+ admin.close();
+ }
+
+ @AfterClass
+ public void cleanup() throws PulsarClientException {
+ pulsarClient.close();
+ pulsarContainer.stop();
+ pulsarContainer.close();
+ }
+
+ private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
+ return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
+ }
+
+ @Test
+ public void testRSAEncryption() throws Exception {
+
+ String topicName = "persistent://my-property/my-ns/myrsa-topic1-" + System.currentTimeMillis();
+
+ class EncKeyReader implements CryptoKeyReader {
+
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+ }
+
+ final int totalMsg = 10;
+
+ Set messageSet = Sets.newHashSet();
+ Consumer consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+ .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe();
+ Consumer normalConsumer = pulsarClient.newConsumer()
+ .topic(topicName).subscriptionName("my-subscriber-name-normal")
+ .subscribe();
+
+ Producer producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+ .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+ Producer producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+ .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+
+ for (int i = 0; i < totalMsg; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+ for (int i = totalMsg; i < totalMsg * 2; i++) {
+ String message = "my-message-" + i;
+ producer2.send(message.getBytes());
+ }
+
+ MessageImpl msg = null;
+
+ msg = (MessageImpl) normalConsumer.receive(500, TimeUnit.MILLISECONDS);
+ // should not able to read message using normal message.
+ assertNull(msg);
+
+ for (int i = 0; i < totalMsg * 2; i++) {
+ msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS);
+ // verify that encrypted message contains encryption-context
+ msg.getEncryptionCtx()
+ .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+ }
+
+ protected void testMessageOrderAndDuplicates(Set messagesReceived, T receivedMessage,
+ T expectedMessage) {
+ // Make sure that messages are received in order
+ Assert.assertEquals(receivedMessage, expectedMessage,
+ "Received message " + receivedMessage + " did not match the expected message " + expectedMessage);
+
+ // Make sure that there are no duplicates
+ Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
+ }
+
+ @Test
+ public void testRedeliveryOfFailedMessages() throws Exception {
+
+ @Cleanup
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl())
+ .build();
+
+ final String encryptionKeyName = "client-rsa.pem";
+ final String encryptionKeyVersion = "1.0";
+ Map metadata = Maps.newHashMap();
+ metadata.put("version", encryptionKeyVersion);
+ class EncKeyReader implements CryptoKeyReader {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+ }
+
+ class InvalidKeyReader implements CryptoKeyReader {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map keyMeta) {
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map metadata) {
+ return null;
+ }
+ }
+
+ /*
+ * Redelivery functionality guarantees that customer will get a chance to process the message again.
+ * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it.
+ *
+ * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers
+ * - few which can decrypt, few which can't (due to errors or cryptoReader not configured).
+ *
+ * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message.
+ *
+ * Consumer 1 - Can decrypt message
+ * Consumer 2 - Has invalid Reader configured.
+ * Consumer 3 - Has no reader configured.
+ *
+ */
+
+ String topicName = "persistent://my-property/my-ns/myrsa-topic2";
+
+ Producer producer = pulsarClient.newProducer().topic(topicName)
+ .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+ .cryptoKeyReader(new EncKeyReader()).create();
+
+ PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+ Consumer consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName)
+ .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
+ .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+ PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+ Consumer consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName)
+ .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
+ .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+ PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
+ Consumer consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName)
+ .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+ int numberOfMessages = 100;
+ String message = "my-message";
+ Set messages = new HashSet(); // Since messages are in random order
+ for (int i = 0; i < numberOfMessages; i++) {
+ producer.send((message + i).getBytes());
+ }
+
+ // Consuming from consumer 2 and 3
+ // no message should be returned since they can't decrypt the message
+ Message m = consumer2.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+ m = consumer3.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ // All messages would be received by consumer 1
+ m = consumer1.receive();
+ messages.add(new String(m.getData()));
+ consumer1.acknowledge(m);
+ }
+
+ // Consuming from consumer 2 and 3 again just to be sure
+ // no message should be returned since they can't decrypt the message
+ m = consumer2.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+ m = consumer3.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+
+ // checking if all messages were received
+ for (int i = 0; i < numberOfMessages; i++) {
+ assertTrue(messages.contains((message + i)));
+ }
+
+ consumer1.close();
+ consumer2.close();
+ consumer3.close();
+ newPulsarClient.close();
+ newPulsarClient1.close();
+ newPulsarClient2.close();
+ }
+
+ @Test
+ public void testEncryptionFailure() throws Exception {
+
+ class EncKeyReader implements CryptoKeyReader {
+
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ return keyInfo;
+ } catch (IOException e) {
+ log.error("Failed to read certificate from {}", CERT_FILE_PATH);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ return keyInfo;
+ } catch (IOException e) {
+ log.error("Failed to read certificate from {}", CERT_FILE_PATH);
+ }
+ }
+ return null;
+ }
+ }
+
+ final int totalMsg = 10;
+
+ MessageImpl msg = null;
+ Set messageSet = Sets.newHashSet();
+ Consumer consumer = pulsarClient.newConsumer()
+ .topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name")
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+ // 1. Invalid key name
+ try {
+ pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+ .addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
+ Assert.fail("Producer creation should not suceed if failing to read key");
+ } catch (Exception e) {
+ // ok
+ }
+
+ // 2. Producer with valid key name
+ Producer producer = pulsarClient.newProducer()
+ .topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+ .addEncryptionKey("client-rsa.pem")
+ .cryptoKeyReader(new EncKeyReader())
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ for (int i = 0; i < totalMsg; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ // 3. KeyReder is not set by consumer
+ // Receive should fail since key reader is not setup
+ msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNull(msg, "Receive should have failed with no keyreader");
+
+ // 4. Set consumer config to consume even if decryption fails
+ consumer.close();
+ consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+ .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+ int msgNum = 0;
+ try {
+ // Receive should proceed and deliver encrypted message
+ msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ String expectedMessage = "my-message-" + msgNum++;
+ Assert.assertNotEquals(receivedMessage, expectedMessage, "Received encrypted message " + receivedMessage
+ + " should not match the expected message " + expectedMessage);
+ consumer.acknowledgeCumulative(msg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Failed to receive message even after ConsumerCryptoFailureAction.CONSUME is set.");
+ }
+
+ // 5. Set keyreader and failure action
+ consumer.close();
+ // Set keyreader
+ consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+ .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.FAIL)
+ .cryptoKeyReader(new EncKeyReader()).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+ for (int i = msgNum; i < totalMsg - 1; i++) {
+ msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS);
+ // verify that encrypted message contains encryption-context
+ msg.getEncryptionCtx()
+ .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+
+ // 6. Set consumer config to discard if decryption fails
+ consumer.close();
+ consumer = pulsarClient.newConsumer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
+ .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD)
+ .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+ // Receive should proceed and discard encrypted messages
+ msg = (MessageImpl) consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNull(msg, "Message received even aftet ConsumerCryptoFailureAction.DISCARD is set.");
+ }
+
+ @Test
+ public void testEncryptionConsumerWithoutCryptoReader() throws Exception {
+
+ final String encryptionKeyName = "client-rsa.pem";
+ final String encryptionKeyVersion = "1.0";
+ Map metadata = Maps.newHashMap();
+ metadata.put("version", encryptionKeyVersion);
+ class EncKeyReader implements CryptoKeyReader {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMeta) {
+ String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
+ }
+ return null;
+ }
+ }
+
+ Producer producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic3")
+ .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+ .cryptoKeyReader(new EncKeyReader()).create();
+
+ Consumer consumer = pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic3")
+ .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+ .subscribe();
+
+ String message = "my-message";
+ producer.send(message.getBytes());
+
+ TopicMessageImpl msg = (TopicMessageImpl) consumer.receive(5, TimeUnit.SECONDS);
+
+ String receivedMessage = decryptMessage(msg, encryptionKeyName, new EncKeyReader());
+ assertEquals(message, receivedMessage);
+
+ consumer.close();
+ }
+
+ private String decryptMessage(TopicMessageImpl msg, String encryptionKeyName, CryptoKeyReader reader)
+ throws Exception {
+ Optional ctx = msg.getEncryptionCtx();
+ Assert.assertTrue(ctx.isPresent());
+ EncryptionContext encryptionCtx = ctx
+ .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
+
+ Map keys = encryptionCtx.getKeys();
+ assertEquals(keys.size(), 1);
+ EncryptionContext.EncryptionKey encryptionKey = keys.get(encryptionKeyName);
+ byte[] dataKey = encryptionKey.getKeyValue();
+ Map metadata = encryptionKey.getMetadata();
+ String version = metadata.get("version");
+ assertEquals(version, "1.0");
+
+ CompressionType compressionType = encryptionCtx.getCompressionType();
+ int uncompressedSize = encryptionCtx.getUncompressedMessageSize();
+ byte[] encrParam = encryptionCtx.getParam();
+ String encAlgo = encryptionCtx.getAlgorithm();
+ int batchSize = encryptionCtx.getBatchSize().orElse(0);
+
+ ByteBuf payloadBuf = Unpooled.wrappedBuffer(msg.getData());
+ // try to decrypt use default MessageCryptoBc
+ MessageCrypto crypto = new MessageCryptoBc("test", false);
+ PulsarApi.MessageMetadata.Builder metadataBuilder = PulsarApi.MessageMetadata.newBuilder();
+ PulsarApi.EncryptionKeys.Builder encKeyBuilder = PulsarApi.EncryptionKeys.newBuilder();
+ encKeyBuilder.setKey(encryptionKeyName);
+ ByteString keyValue = ByteString.copyFrom(dataKey);
+ encKeyBuilder.setValue(keyValue);
+ PulsarApi.EncryptionKeys encKey = encKeyBuilder.build();
+ metadataBuilder.setEncryptionParam(ByteString.copyFrom(encrParam));
+ metadataBuilder.setEncryptionAlgo(encAlgo);
+ metadataBuilder.setProducerName("test");
+ metadataBuilder.setSequenceId(123);
+ metadataBuilder.setPublishTime(12333453454L);
+ metadataBuilder.addEncryptionKeys(encKey);
+ metadataBuilder.setCompression(CompressionCodecProvider.convertToWireProtocol(compressionType));
+ metadataBuilder.setUncompressedSize(uncompressedSize);
+ ByteBuf decryptedPayload = crypto.decrypt(() -> metadataBuilder.build(), payloadBuf, reader);
+
+ // try to uncompress
+ CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
+ ByteBuf uncompressedPayload = codec.decode(decryptedPayload, uncompressedSize);
+
+ if (batchSize > 0) {
+ PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
+ .newBuilder();
+ uncompressedPayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
+ singleMessageMetadataBuilder, 0, batchSize);
+ }
+
+ byte[] data = new byte[uncompressedPayload.readableBytes()];
+ uncompressedPayload.readBytes(data);
+ uncompressedPayload.release();
+ return new String(data);
+ }
+
+}
diff --git a/tests/pulsar-client-shade-test/src/test/resources/certificate/client.crt b/tests/pulsar-client-shade-test/src/test/resources/certificate/client.crt
new file mode 100644
index 0000000000000..2d7d156866a86
--- /dev/null
+++ b/tests/pulsar-client-shade-test/src/test/resources/certificate/client.crt
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDVjCCAj4CCQCtw/UnTFDT7DANBgkqhkiG9w0BAQUFADBtMQswCQYDVQQGEwJB
+VTETMBEGA1UECAwKU29tZS1TdGF0ZTEVMBMGA1UEBwwMRGVmYXVsdCBDaXR5MSEw
+HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBmNsaWVu
+dDAeFw0xNjA2MjAwMTQ1NDZaFw0yNjA2MTgwMTQ1NDZaMG0xCzAJBgNVBAYTAkFV
+MRMwEQYDVQQIDApTb21lLVN0YXRlMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxITAf
+BgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UEAwwGY2xpZW50
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqQV5F3Au9FWXIYPdWqiX
+Rk5gdVmVkDuuFK4ZoOd8inoJpB3PPkpmpgoVkKQHDFhgx3ODGWIUgo+n6QDsJxY4
+ygHfVeggQgek8iUfteYVsIcHS0bjkhIij/3ihC301FkiqbrV069oLvUXLKcv3zxG
+mdBAiz0k4xGZhFieVRvQCLY9syUUxmQ/3Cv42lDY8a1gTw4CRRx/hCfDvXCKhOT4
+bMwUIDZfHB3JoDh3Thp8FLz0nTrRF75mSQJ/OdcafIm0Xoz2Otp/CSxLS+U1lLvG
+05crWTDe0om7NW4mK4CqGCFq5gUw7eIzaeO7Q5Qez9XGTMzkgIDTMvNYGGEeJhhm
+NQIDAQABMA0GCSqGSIb3DQEBBQUAA4IBAQAKXy4g6hljY5MpO8mbZh+uJHq6NEUs
+4dr7OKDDWc39AROZsGf2eFUmHOjmRSw7VHpguGKI+rFRELVffpg/VvMh5apu+DBf
+jhxtDNceAyh5uugPNUJHXyeikBDYW8bAzUU3DmMldPkTZWcGjurmyhDQ1TtK2YJe
+RMFBXw5aAzdJMNi6OfXDH/ZX32hrb482yghDZj+ndnm0FefmLbFTQRMF8/fIHb1W
+kqNHwIaapZwH6j/MJy/TRFYcJunrBUYT9zVjY46k3GU0ex/Bn7T4pg9gzgFGZJhn
+jQQFKliIC84thCzdlPkrLduLY8tmlDKpLXatbEQ+s1MmNOURm6irPp6g
+-----END CERTIFICATE-----
diff --git a/tests/pulsar-client-shade-test/src/test/resources/certificate/client.csr b/tests/pulsar-client-shade-test/src/test/resources/certificate/client.csr
new file mode 100644
index 0000000000000..e01f33ef073f6
--- /dev/null
+++ b/tests/pulsar-client-shade-test/src/test/resources/certificate/client.csr
@@ -0,0 +1,17 @@
+-----BEGIN CERTIFICATE REQUEST-----
+MIICsjCCAZoCAQAwbTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx
+FTATBgNVBAcMDERlZmF1bHQgQ2l0eTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0
+cyBQdHkgTHRkMQ8wDQYDVQQDDAZjbGllbnQwggEiMA0GCSqGSIb3DQEBAQUAA4IB
+DwAwggEKAoIBAQCpBXkXcC70VZchg91aqJdGTmB1WZWQO64Urhmg53yKegmkHc8+
+SmamChWQpAcMWGDHc4MZYhSCj6fpAOwnFjjKAd9V6CBCB6TyJR+15hWwhwdLRuOS
+EiKP/eKELfTUWSKputXTr2gu9Rcspy/fPEaZ0ECLPSTjEZmEWJ5VG9AItj2zJRTG
+ZD/cK/jaUNjxrWBPDgJFHH+EJ8O9cIqE5PhszBQgNl8cHcmgOHdOGnwUvPSdOtEX
+vmZJAn851xp8ibRejPY62n8JLEtL5TWUu8bTlytZMN7Sibs1biYrgKoYIWrmBTDt
+4jNp47tDlB7P1cZMzOSAgNMy81gYYR4mGGY1AgMBAAGgADANBgkqhkiG9w0BAQUF
+AAOCAQEAk3eueaq/gonBzKH75oWHlqPbMZQFk4NXqx8h24ZfkCzPEFPyDM+jdQxv
+8vDtyWq+fizqAQmGrM7WPHgnTbmZyovfmwuKwtTlkD/8t7XpTmm9fYspbL4WzdP1
+y8/Vug09te+rni+v+kjk5b9IceEy6kLvXuzirE6c4LunAm+thrr5gWmsx1pyDiq7
+W2M15UZrm/paaCg6cVaMFdXCRZP+g1P4NcgDUe2TyFbLlhOJNtX3DJRZWEhrkEYK
+mRz2tJuiuitCzheAgRrFXepRagHKYffNSas1n/2kIc9QpZ8654kxsAzEwL7CnHd/
+SHbMS9dfP+uM6DACwcvngSOBMJ9KMg==
+-----END CERTIFICATE REQUEST-----
diff --git a/tests/pulsar-client-shade-test/src/test/resources/certificate/client.key b/tests/pulsar-client-shade-test/src/test/resources/certificate/client.key
new file mode 100644
index 0000000000000..34fc701c5257d
--- /dev/null
+++ b/tests/pulsar-client-shade-test/src/test/resources/certificate/client.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCpBXkXcC70VZch
+g91aqJdGTmB1WZWQO64Urhmg53yKegmkHc8+SmamChWQpAcMWGDHc4MZYhSCj6fp
+AOwnFjjKAd9V6CBCB6TyJR+15hWwhwdLRuOSEiKP/eKELfTUWSKputXTr2gu9Rcs
+py/fPEaZ0ECLPSTjEZmEWJ5VG9AItj2zJRTGZD/cK/jaUNjxrWBPDgJFHH+EJ8O9
+cIqE5PhszBQgNl8cHcmgOHdOGnwUvPSdOtEXvmZJAn851xp8ibRejPY62n8JLEtL
+5TWUu8bTlytZMN7Sibs1biYrgKoYIWrmBTDt4jNp47tDlB7P1cZMzOSAgNMy81gY
+YR4mGGY1AgMBAAECggEAcJj3yVhvv0/BhY8+CCYl2K1f7u1GCLbpSleNNTbhLbMM
+9yrwo/OWnGg9Y4USOPQrTNOz81X2id+/oSZ/K67PGCvVJ3qi+rny9WkrzdbAfkAF
+6O0Jr4arRbeBjkK7Rjc3M1EHH6VLx3R5AsNBzfpuogss5FVQXICd/5+1oscLeLEx
+/Fn+51IEn9FUg5vr7ElG51f+zPxexcWHLNoqGjTEIGGtI8/CfTzD9tBV4sIjf/Nc
+Zzfs9XYrChfcrS0U1zDa+L7c5gYfoN6M08sBiuZlhyyO9wgzPlp+XnsrSFv6hUta
+0scjAbN4bh+orQn6zgFN/sjkQnraWXW7pKFLyTR/IQKBgQDVju4IbhE9XRweNgXi
+s3BuGV+HsuFffEf0904/zCuCUcScGb5WCz5+KtlFJ//YxfocHVZajH+4GdCGbWim
+m+H3XvRpWgfK/aBNOXu5ueLbnPYyPjTrcpKRsomeoiV+Jz1tv5PQElwzCiCzVvQf
+fMyhQT16YIsFQAGJzQMBEHWODQKBgQDKnKps3sKSR3ycUtIxCVXUir7p52qst0Pm
+bPO8JrcRKZP2z8MJB96+DcQFzrxj7t5DDktkYEsFOPPuIeUsYXsY+MKHs4hEQVCz
+hpDJJNQ8s+SV8TLzKpinZEmLIjslLbn2rQrpqybPg84VxqX3qqM8IrXhMf77aGj6
+QHqvQwHWyQKBgQDF1RVO+9++j82ncvY6z22coKath5leIjxqgtqbISFBJUxUK0j2
+Xo4yxLDnbqmE/8m1V7wSP8tlGYzhquLiTM+kn/Mc0Ukc0503TMQABmJQfXRYkOXn
+IwkCLXltWdoPpnwyeeGNRCTjJ0OpvyiBLtRFobE498xxPZzvMdrRlpS/1QKBgQCo
+wmMleUnBQ2/kWQugMnFeLg6kjs+IesFAnYFKN0kGL4aB7j06OWbrEFY0rCS4bA6O
+9coQGjCCchSjRXI4TB2XCCQnmX8nsuuADNZt45Iv2XrM9XEFn3Y0/tBO5j0zU2nw
+r+NGC/uwns050BMPPf7mqNarctQ6HZZK0wgdEQfoGQKBgC+pbkQv9cn68TsiaJ3w
+tvNRTXCIAAH4Vtn9Cp+63ao+kXn94BJqQF99i58kJpG4ol6wbCHUoC6fHgxUh5HB
+JB0HjC2eCMgn4acAQg0sPW6l35KX36yYxtrL7eosB/yBYum0XAwmboNjEhlCZkOs
+YOpSsn61g7xqqrt40Spb5vUn
+-----END PRIVATE KEY-----
diff --git a/tests/pulsar-client-shade-test/src/test/resources/certificate/private-key.client-ecdsa.pem b/tests/pulsar-client-shade-test/src/test/resources/certificate/private-key.client-ecdsa.pem
new file mode 100644
index 0000000000000..58ab3d4ff9be8
--- /dev/null
+++ b/tests/pulsar-client-shade-test/src/test/resources/certificate/private-key.client-ecdsa.pem
@@ -0,0 +1,13 @@
+-----BEGIN EC PARAMETERS-----
+MIGXAgEBMBwGByqGSM49AQECEQD////9////////////////MDsEEP////3/////
+//////////wEEOh1ecEQefQ92CSZPCzuXtMDFQAADg1NaW5naHVhUXUMwDpEc9A2
+eQQhBBYf91KLiZstDChgfKUsW4bPWsg5W6/rE8AtopLd7XqDAhEA/////gAAAAB1
+ow0bkDihFQIBAQ==
+-----END EC PARAMETERS-----
+-----BEGIN EC PRIVATE KEY-----
+MIHYAgEBBBDeu9hc8kOvL3pl+LYSjLq9oIGaMIGXAgEBMBwGByqGSM49AQECEQD/
+///9////////////////MDsEEP////3///////////////wEEOh1ecEQefQ92CSZ
+PCzuXtMDFQAADg1NaW5naHVhUXUMwDpEc9A2eQQhBBYf91KLiZstDChgfKUsW4bP
+Wsg5W6/rE8AtopLd7XqDAhEA/////gAAAAB1ow0bkDihFQIBAaEkAyIABOsqPpE8
+cY80pxkog5xw3i2AQ0yfV3MqMusxlOQnigBp
+-----END EC PRIVATE KEY-----
diff --git a/tests/pulsar-client-shade-test/src/test/resources/certificate/private-key.client-mismatch-rsa.pem b/tests/pulsar-client-shade-test/src/test/resources/certificate/private-key.client-mismatch-rsa.pem
new file mode 100644
index 0000000000000..3e2831ae8a398
--- /dev/null
+++ b/tests/pulsar-client-shade-test/src/test/resources/certificate/private-key.client-mismatch-rsa.pem
@@ -0,0 +1,29 @@
+-----BEGIN EC PARAMETERS-----
+MIIBwgIBATBNBgcqhkjOPQEBAkIB////////////////////////////////////
+//////////////////////////////////////////////////8wgZ4EQgH/////
+////////////////////////////////////////////////////////////////
+/////////////////ARBUZU+uWGOHJofkpohoLaFQO6i2nJbmbMV87i0iZGO8Qnh
+Vhk5Uex+k3sWUsC9O7G/BzVz34g9LDTx70Uf1GtQPwADFQDQnogAKRy4U5bMZxc5
+MoSqoNpkugSBhQQAxoWOBrcEBOnNnj7LZiOVtEKcZIE5BT+1Ifgor2BrTT26oUte
+d+/nWSj+HcEnov+o3jNIs8GFakKb+X5+McLlvWYBGDkpaniaO8AEXIpftCx9G9mY
+9URJV5tEaBevvRcnPmYsl+5ymV70JkDFULkBP60HYTU8cIaicsJAiL6Udp/RZlAC
+QgH///////////////////////////////////////////pRhoeDvy+Wa3/MAUj3
+CaXQO7XJuImcR667b7cekThkCQIBAQ==
+-----END EC PARAMETERS-----
+-----BEGIN EC PRIVATE KEY-----
+MIICnQIBAQRCAeNLEp1HefZ1nMl5vvgFMsJCd5ieCWqPT7TXbQkn27A8WkyAGTYC
+GtolyPokOgSjbJh+ofBt/MgvE/nMrqzmkZVtoIIBxjCCAcICAQEwTQYHKoZIzj0B
+AQJCAf//////////////////////////////////////////////////////////
+////////////////////////////MIGeBEIB////////////////////////////
+//////////////////////////////////////////////////////////wEQVGV
+PrlhjhyaH5KaIaC2hUDuotpyW5mzFfO4tImRjvEJ4VYZOVHsfpN7FlLAvTuxvwc1
+c9+IPSw08e9FH9RrUD8AAxUA0J6IACkcuFOWzGcXOTKEqqDaZLoEgYUEAMaFjga3
+BATpzZ4+y2YjlbRCnGSBOQU/tSH4KK9ga009uqFLXnfv51ko/h3BJ6L/qN4zSLPB
+hWpCm/l+fjHC5b1mARg5KWp4mjvABFyKX7QsfRvZmPVESVebRGgXr70XJz5mLJfu
+cple9CZAxVC5AT+tB2E1PHCGonLCQIi+lHaf0WZQAkIB////////////////////
+///////////////////////6UYaHg78vlmt/zAFI9wml0Du1ybiJnEeuu2+3HpE4
+ZAkCAQGhgYkDgYYABAFhUHeaHfIWre/pPmv2a2l891co79dFpg6ixPRg+Y5qe0C7
+src//LT/ZR5rgj8ne+YcaIlwyQRl5OYEd25n799IcgHIBTGyaLB6Td5mW/oWT/Fz
+soufOnUJ7O/kDHjIQ15sczk3rDhe8/mB9zPjKlKTuAl5jBEt6E3yiB44Dtng02xD
+uQ==
+-----END EC PRIVATE KEY-----
diff --git a/tests/pulsar-client-shade-test/src/test/resources/certificate/private-key.client-rsa.pem b/tests/pulsar-client-shade-test/src/test/resources/certificate/private-key.client-rsa.pem
new file mode 100644
index 0000000000000..a0d589e0e2a4c
--- /dev/null
+++ b/tests/pulsar-client-shade-test/src/test/resources/certificate/private-key.client-rsa.pem
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAtKWwgqdnTYrOCv+j1MkTWfSH0wCsHZZca9wAW3qP4uuhlBvn
+b10JcFf5ZjzP9BSXK+tHmI8uoN368vEv6yhURHM4yuXqzCxzuAwkQSo39rzX8PGC
+7qdjCN7LDJ3MnqiBIrUsSaEP1wrNsB1kI+o9ER1e5O/uEPAotP933hHQ0J2hMEek
+HqL7sBlJ98h6NmsicEaUkardk0TOXrlkjC+cMd8ZbGScPqI9M38bmn3OLxFTn1vt
+hpvnXLvCmG4M+6xtYtD+npcVPZw1i1R90fMs7ppZnRbv8Hc/DFdOKVQIgam6CDdn
+NKgW7c7IBMrP0AEm37HTu0LSOjP2OHXlvvlQGQIDAQABAoIBAAaJFAi2C7u3cNrf
+AstY9vVDLoLIvHFZlkBktjKZDYmVIsRb+hSCViwVUrWLL67R6+Iv4eg4DeTOAx00
+8pncXKgZTw2wIb1/QjR/Y/RjlaC8lkdmRWli7udMQCZVsyhuSjW6Pj7vr8YE4woj
+FhNijxEGcf9wWrmMJrzdnTWQiXByo+eTvUQ9BPgPGrRjsMZmTkLyAVJff2DfxO5b
+IWFDYDJcyYAMCIMQu7vys/I50ou6ilb1CO6QM6Z7KpPeOoVFPwtzbh8cf9xM8UNS
+j6J/JmdWhgI34GS3NA68xTQ6PV7zjnhCc+iccm3JKyzGXwaApAZ+Eoce/9j4WKmu
+5B4ziR0CgYEA3l/9OHbl1zmyV+rRxWOIj/i2rTvHzwBnbnPJyuemL5VMFdpGodQ3
+vwHvyQmcECRVRxmXojQ4QuPPHs3qp6wEEFPCWxChLSTxlUc85SOFHWU2O99jV7zI
+7+JOpDK/Mstsx9nHgXduJF+glTFtA3LH8Oqylzu2aFPsprwKuZf94Q8CgYEAz/Zx
+akEG+PEMtP5YS28cX5XfjsIX/V26Fs6/sH16QjUIEddE5T4fCuokxCjSiwUcWhml
+pHEJ5S5xp3VYRfISW3jRW3qstIH1tpZipB6+S0zTuJmLJbA3IiWEg2rtMt7X1uJv
+A/bYOqe0hOPTuXuZdtVZ0nMTKk7GG8O6VkBI7FcCgYEAkDfCmscJgs7JahlBWHmX
+zH9pwem+SPKjIc/4NB6N+dgikx2Pp05hpP/VihUwYIufvs/LNogVYNQrtHepUnrN
+2+TmbHbZgNSv1Ldxt82UfB7y0FutKu6lhmXHyNecho3Fi8sih0V0aiSWmYuHfrAH
+GaiskEZKo1iiZvQXJIx9O2MCgYATBf0r9hTYMtyxtc6H3/sdd01C9thQ8gDy0yjP
+0Tqc0dMSJroDqmIWkoKYew9/bhFA4LW5TCnWkCAPbHmNtG4fdfbYwmkH/hdnA2y0
+jKdlpfp8GXeUFAGHGx17FA3sqFvgKUh0eWEgRHUL7vdQMVFBgJS93o7zQM94fLgP
+6cOB8wKBgFcGV4GjI2Ww9cillaC554MvoSjf8B/+04kXzDOh8iYIIzO9EUil1jjK
+Jvxp4hnLzTKWbux3MEWqurLkYas6GpKBjw+iNOCar6YdqWGVqM3RUx7PTUaZwkKx
+UdP63IfY7iZCIT/QbyHQvIUe2MaiVnH+ulxdkK6Y5e7gxcbckIH4
+-----END RSA PRIVATE KEY-----
diff --git a/tests/pulsar-client-shade-test/src/test/resources/certificate/public-key.client-ecdsa.pem b/tests/pulsar-client-shade-test/src/test/resources/certificate/public-key.client-ecdsa.pem
new file mode 100644
index 0000000000000..5aeb429a3d695
--- /dev/null
+++ b/tests/pulsar-client-shade-test/src/test/resources/certificate/public-key.client-ecdsa.pem
@@ -0,0 +1,7 @@
+-----BEGIN PUBLIC KEY-----
+MIHKMIGjBgcqhkjOPQIBMIGXAgEBMBwGByqGSM49AQECEQD////9////////////
+////MDsEEP////3///////////////wEEOh1ecEQefQ92CSZPCzuXtMDFQAADg1N
+aW5naHVhUXUMwDpEc9A2eQQhBBYf91KLiZstDChgfKUsW4bPWsg5W6/rE8AtopLd
+7XqDAhEA/////gAAAAB1ow0bkDihFQIBAQMiAATrKj6RPHGPNKcZKIOccN4tgENM
+n1dzKjLrMZTkJ4oAaQ==
+-----END PUBLIC KEY-----
diff --git a/tests/pulsar-client-shade-test/src/test/resources/certificate/public-key.client-mismatch-rsa.pem b/tests/pulsar-client-shade-test/src/test/resources/certificate/public-key.client-mismatch-rsa.pem
new file mode 100644
index 0000000000000..6fc427b1b19dc
--- /dev/null
+++ b/tests/pulsar-client-shade-test/src/test/resources/certificate/public-key.client-mismatch-rsa.pem
@@ -0,0 +1,9 @@
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAtKWwgqdnTYrOCv+j1MkT
+WfSH0wCsHZZca9wAW3qP4uuhlBvnb10JcFf5ZjzP9BSXK+tHmI8uoN368vEv6yhU
+RHM4yuXqzCxzuAwkQSo39rzX8PGC7qdjCN7LDJ3MnqiBIrUsSaEP1wrNsB1kI+o9
+ER1e5O/uEPAotP933hHQ0J2hMEekHqL7sBlJ98h6NmsicEaUkardk0TOXrlkjC+c
+Md8ZbGScPqI9M38bmn3OLxFTn1vthpvnXLvCmG4M+6xtYtD+npcVPZw1i1R90fMs
+7ppZnRbv8Hc/DFdOKVQIgam6CDdnNKgW7c7IBMrP0AEm37HTu0LSOjP2OHXlvvlQ
+GQIDAQAB
+-----END PUBLIC KEY-----
diff --git a/tests/pulsar-client-shade-test/src/test/resources/certificate/public-key.client-rsa.pem b/tests/pulsar-client-shade-test/src/test/resources/certificate/public-key.client-rsa.pem
new file mode 100644
index 0000000000000..6fc427b1b19dc
--- /dev/null
+++ b/tests/pulsar-client-shade-test/src/test/resources/certificate/public-key.client-rsa.pem
@@ -0,0 +1,9 @@
+-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAtKWwgqdnTYrOCv+j1MkT
+WfSH0wCsHZZca9wAW3qP4uuhlBvnb10JcFf5ZjzP9BSXK+tHmI8uoN368vEv6yhU
+RHM4yuXqzCxzuAwkQSo39rzX8PGC7qdjCN7LDJ3MnqiBIrUsSaEP1wrNsB1kI+o9
+ER1e5O/uEPAotP933hHQ0J2hMEekHqL7sBlJ98h6NmsicEaUkardk0TOXrlkjC+c
+Md8ZbGScPqI9M38bmn3OLxFTn1vthpvnXLvCmG4M+6xtYtD+npcVPZw1i1R90fMs
+7ppZnRbv8Hc/DFdOKVQIgam6CDdnNKgW7c7IBMrP0AEm37HTu0LSOjP2OHXlvvlQ
+GQIDAQAB
+-----END PUBLIC KEY-----
diff --git a/tests/pulsar-client-shade-test/src/test/resources/certificate/server.crt b/tests/pulsar-client-shade-test/src/test/resources/certificate/server.crt
new file mode 100644
index 0000000000000..59b651be2a406
--- /dev/null
+++ b/tests/pulsar-client-shade-test/src/test/resources/certificate/server.crt
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDLjCCAhYCCQDn/Yvym+FMsDANBgkqhkiG9w0BAQUFADBZMQswCQYDVQQGEwJB
+VTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0
+cyBQdHkgTHRkMRIwEAYDVQQDEwlsb2NhbGhvc3QwHhcNMTYwNjEzMjIyMTQ2WhcN
+MjYwNjExMjIyMTQ2WjBZMQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0
+ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMRIwEAYDVQQDEwls
+b2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCs29IuzZvk
+OGUkS/wqKzd/h2esqjCSjw4SLLbeh1GA3UEvh1k9+eRiYwJG1yCOHmcsp4A8Du99
+8xbgeihpWWw7pjL5VVky3ciuvHyz1Cc6bKRps/GzVJBwFP0gzHnK8bUM86U52yGT
+1DepD/Y2lURy0igdVcAMjGweMwoTmiaVcwZexfYuEef+jz3fmpmOwP9rboIA9rQr
+mTbLzzkbAwZXdl+bRvIefIjIazIzTOs8tJWrhFaTJUgBhhLjFIwTdpS+n+FqOu8J
+92K+PvKjIeJ3kmnZyRHK7uidlAn0g/DK+co1sX3zORPCWeg21K+/vVHTj91zARNb
+O9hVS4bqqsw9AgMBAAEwDQYJKoZIhvcNAQEFBQADggEBACE0WBuTbHcPtYKv2ZMS
+mYk9jvtAhmWHQ6tNqV8CmS2AsrzZdWglGaqIRsm5slkD2BGeQS+BesTArUuENTmP
+r9kJSecdiiB8aWtLbhoCSH3QR6IW/b5UVl6sR5OIh7SkNTjMSUSDnMEVLNGyKZGS
+gCGVbDf3n5KhOTnwqguELRykynKFt2LVksBia9+88lUtiRHpbyClo/KVWltJlaww
+PT0WEpwqVUcHmwrR3MTzJDEPvIplSgxdaDmFGYS1YKm9T/wQd+t/0DbXMmfJXBbd
+FVUnB6o7qJVU9N2Tbaj9NbCtwz5nTZG4A5kRXWHVjZsn5WzLuS/me3rDXjwlfB2p
+ipY=
+-----END CERTIFICATE-----
diff --git a/tests/pulsar-client-shade-test/src/test/resources/certificate/server.csr b/tests/pulsar-client-shade-test/src/test/resources/certificate/server.csr
new file mode 100644
index 0000000000000..8782222c5ab46
--- /dev/null
+++ b/tests/pulsar-client-shade-test/src/test/resources/certificate/server.csr
@@ -0,0 +1,17 @@
+-----BEGIN CERTIFICATE REQUEST-----
+MIICnjCCAYYCAQAwWTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUtU3RhdGUx
+ITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAxMJbG9j
+YWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArNvSLs2b5Dhl
+JEv8Kis3f4dnrKowko8OEiy23odRgN1BL4dZPfnkYmMCRtcgjh5nLKeAPA7vffMW
+4HooaVlsO6Yy+VVZMt3Irrx8s9QnOmykabPxs1SQcBT9IMx5yvG1DPOlOdshk9Q3
+qQ/2NpVEctIoHVXADIxsHjMKE5omlXMGXsX2LhHn/o8935qZjsD/a26CAPa0K5k2
+y885GwMGV3Zfm0byHnyIyGsyM0zrPLSVq4RWkyVIAYYS4xSME3aUvp/hajrvCfdi
+vj7yoyHid5Jp2ckRyu7onZQJ9IPwyvnKNbF98zkTwlnoNtSvv71R04/dcwETWzvY
+VUuG6qrMPQIDAQABoAAwDQYJKoZIhvcNAQEFBQADggEBAEPHySnpf3E/7tZsiDka
+rqdB/sU7fdqjyV0iy0cuKQkU8WYrsE7bHkqMYc8CiIDfWhIGW5Jnzups2O6eH0Sx
+2BS21ARFiNGC1UfY1HSV2zrTNh3RqQa3YsXzv9vvdQ/gjsqGDuGDIc1yAA+Ytdja
+3rhIzEVqBhiLzg+M2+gW1zs+Kqj0Zo0pLB2uqhdZJmjxBb2FCli50vCVEhqIS3RO
+KTE+AJfxThWIeahFyVaskaEGkS6NVr2JihV0elbKolH19k2UzRTVn7p3Ixh5ojuW
+gtU/90vOy/SDkSRmCWMqgkUKJ2oeImleHdrvwNyrzvrLWRAz6R5yGQJwji9kKpHD
+FK0=
+-----END CERTIFICATE REQUEST-----
diff --git a/tests/pulsar-client-shade-test/src/test/resources/certificate/server.key b/tests/pulsar-client-shade-test/src/test/resources/certificate/server.key
new file mode 100644
index 0000000000000..6da70f5aec3b5
--- /dev/null
+++ b/tests/pulsar-client-shade-test/src/test/resources/certificate/server.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCs29IuzZvkOGUk
+S/wqKzd/h2esqjCSjw4SLLbeh1GA3UEvh1k9+eRiYwJG1yCOHmcsp4A8Du998xbg
+eihpWWw7pjL5VVky3ciuvHyz1Cc6bKRps/GzVJBwFP0gzHnK8bUM86U52yGT1Dep
+D/Y2lURy0igdVcAMjGweMwoTmiaVcwZexfYuEef+jz3fmpmOwP9rboIA9rQrmTbL
+zzkbAwZXdl+bRvIefIjIazIzTOs8tJWrhFaTJUgBhhLjFIwTdpS+n+FqOu8J92K+
+PvKjIeJ3kmnZyRHK7uidlAn0g/DK+co1sX3zORPCWeg21K+/vVHTj91zARNbO9hV
+S4bqqsw9AgMBAAECggEAd/LuDeZFZ/+uR5qmuAhXMZqfWZSbsges5vW6S/6wkvB1
+vGp6heQzFAbKXKgJgjUcuULeXE6s58RYuppqEnin/1hcBOKxy/dUu9Q14H+2XPdo
+u6TPcvaaZ/xYjnr1hNtnHD6yB8zEpxVbLmjSHJxF7Dti9MA9TTfgCrC2LFYKsicD
+/5AQyHuwpHyTL3Iiwv4Qtks/SD2a3fu8lD0yTQwA/hY6/0ieXxXd9tZV5a6GSA0P
+nieol1byfuX7Q5fb8ggPd9u9K1mVZTBRKiE5w+uU4Ic2IkBmZX5ZuRS+vFplpLsY
+YpFPvzFmpNkpK2SdYjJ+V4tkJsFHmOaFRgW/0QB2DQKBgQDeQMSZBQlPUrgRdWHN
+OyvTcrSvXzg5DbaIj39tgdNZ6PYns/thD0n707KGRJOChIyYiiKxLxzLWdPUxqQO
+rNLUV9IkMVc/QZR8RUqGc2BxmPOxAprhzeOhLsyqP/sgtxRHAnLqmkXuHYoxvTZ6
+LFCRCZBpEJrutGxl3s/x+sfkuwKBgQDHGwnSmvArpL8ZY1dV4xKNkxifCBnNmqAl
+TKHPW3odN9nkMECEt1XUIioUUKXUsiAZNp5xa/v1DEyJ4f2T20QKcAGbS18b1M5W
+axIoH3IhyLo74tuo0fthgq5bzypfFOlIjo7F9mpEky/461RWmoNAAlp9+FkDi48C
+KwjAk39/ZwKBgQDXFJqs8sDFsOlMi+nvsHmDERhmNqG0JN8mXKgWk3KzKc09MuHs
+Vd1lBMNZSHfv8NIWtGdKTKty5yUmXm1ZfkoxECPevpkOMCq/8FZksrb8d+YswLae
+Gp9U1nNdtrkSOdo3tdj7y/wsqQ2ZgOB9bvEwyq6j3lvw8U2NcAiQxf44DQKBgBHb
+lPf0uZHQhutKA61KXoGgLdclrNrKAY8W3nRwqfUw6zQSN9cvcl1Cay/DQ/xdtY9N
+XMyjeMezwLGlOU8nnWSqQxqgmfkvDwqlM82xdFUfYcS5RiZQHxHR3L2TSSOaBoph
+buDGhyV7ZhQXV0slNJxrGZ6uxZ0RyVPSdEiBcjAFAoGBAJqZ6uCVHpv/FwZVggu7
+Xb9EIxZnLSmXwaXFpJoMZpRpKb8cSTTJbgSMv3Dq2LcNKYXdNBhgKgPSc/XipXt9
+ZdT36KWipV+PzW691kUiWHtA8/+E0LCi4Y7rlcBMz9PgDNXK4XMMZOVKxDqPcHSJ
+P6y01ku7T2X+abUiJ334Hg6G
+-----END PRIVATE KEY-----
diff --git a/tests/pulsar-client-shade-test/src/test/resources/pulsar.xml b/tests/pulsar-client-shade-test/src/test/resources/pulsar.xml
index 4f2086897648f..df07642a01851 100644
--- a/tests/pulsar-client-shade-test/src/test/resources/pulsar.xml
+++ b/tests/pulsar-client-shade-test/src/test/resources/pulsar.xml
@@ -25,6 +25,7 @@
+