Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

End to End Encryption Support - Java client #731

Merged
merged 2 commits into from
Sep 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ flexible messaging model and an intuitive client API.</description>
<aspectj.version>1.8.9</aspectj.version>
<rocksdb.version>5.5.5</rocksdb.version>
<slf4j.version>1.7.25</slf4j.version>
<bouncycastle.version>1.55</bouncycastle.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -523,6 +524,11 @@ flexible messaging model and an intuitive client API.</description>
<version>1.0.2</version>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>${bouncycastle.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -2159,4 +2164,274 @@ public void testFailReceiveAsyncOnConsumerClose() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test(groups = "encryption")
public void testECDSAEncryption() throws Exception {
log.info("-- Starting {} test --", methodName);

class EncKeyReader implements CryptoKeyReader {

@Override
public byte[] getPublicKey(String keyName) {
String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
return Files.readAllBytes(Paths.get(CERT_FILE_PATH));
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
} else {
Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
}
return null;
}

@Override
public byte[] getPrivateKey(String keyName) {
String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
return Files.readAllBytes(Paths.get(CERT_FILE_PATH));
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
} else {
Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
}
return null;
}
}

final int totalMsg = 10;

Set<String> messageSet = Sets.newHashSet();
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
conf.setCryptoKeyReader(new EncKeyReader());
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/myecdsa-topic1", "my-subscriber-name",
conf);

ProducerConfiguration producerConf = new ProducerConfiguration();
producerConf.addEncryptionKey("client-ecdsa.pem");
producerConf.setCryptoKeyReader(new EncKeyReader());

Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/myecdsa-topic1", producerConf);
for (int i = 0; i < totalMsg; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

Message msg = null;

for (int i = 0; i < totalMsg; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}

@Test(groups = "encryption")
public void testRSAEncryption() throws Exception {
log.info("-- Starting {} test --", methodName);

class EncKeyReader implements CryptoKeyReader {

@Override
public byte[] getPublicKey(String keyName) {
String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
return Files.readAllBytes(Paths.get(CERT_FILE_PATH));
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
} else {
Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
}
return null;
}

@Override
public byte[] getPrivateKey(String keyName) {
String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
return Files.readAllBytes(Paths.get(CERT_FILE_PATH));
} catch (IOException e) {
Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
}
} else {
Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable.");
}
return null;
}
}

final int totalMsg = 10;

Set<String> messageSet = Sets.newHashSet();
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
conf.setCryptoKeyReader(new EncKeyReader());
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/myrsa-topic1", "my-subscriber-name",
conf);

ProducerConfiguration producerConf = new ProducerConfiguration();
producerConf.addEncryptionKey("client-rsa.pem");
producerConf.setCryptoKeyReader(new EncKeyReader());

Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/myrsa-topic1", producerConf);
Producer producer2 = pulsarClient.createProducer("persistent://my-property/use/my-ns/myrsa-topic1", producerConf);
for (int i = 0; i < totalMsg; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
for (int i = totalMsg; i < totalMsg*2; i++) {
String message = "my-message-" + i;
producer2.send(message.getBytes());
}

Message msg = null;

for (int i = 0; i < totalMsg*2; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}

@Test(groups = "encryption")
public void testEncryptionFailure() throws Exception {
log.info("-- Starting {} test --", methodName);

class EncKeyReader implements CryptoKeyReader {

@Override
public byte[] getPublicKey(String keyName) {
String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
return Files.readAllBytes(Paths.get(CERT_FILE_PATH));
} catch (IOException e) {
log.error("Failed to read certificate from {}", CERT_FILE_PATH);
}
}
return null;
}

@Override
public byte[] getPrivateKey(String keyName) {
String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
return Files.readAllBytes(Paths.get(CERT_FILE_PATH));
} catch (IOException e) {
log.error("Failed to read certificate from {}", CERT_FILE_PATH);
}
}
return null;
}
}

final int totalMsg = 10;

ProducerConfiguration producerConf = new ProducerConfiguration();

Message msg = null;
Set<String> messageSet = Sets.newHashSet();
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/myenc-topic1", "my-subscriber-name",
conf);

// 1. Invalid key name
producerConf.addEncryptionKey("client-non-existant-rsa.pem");
producerConf.setCryptoKeyReader(new EncKeyReader());

try {
Producer producer = pulsarClient.createProducer("persistent://my-property/use/myenc-ns/myenc-topic1", producerConf);
Assert.fail("Producer creation should not suceed if failing to read key");
} catch (Exception e) {
// ok
}

// 2. Producer with valid key name
producerConf = new ProducerConfiguration();
producerConf.setCryptoKeyReader(new EncKeyReader());
producerConf.addEncryptionKey("client-rsa.pem");
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/myenc-topic1", producerConf);

for (int i = 0; i < totalMsg; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

// 3. KeyReder is not set by consumer
// Receive should fail since key reader is not setup
msg = consumer.receive(1, TimeUnit.SECONDS);
Assert.assertNull(msg, "Receive should have failed with no keyreader");

// 4. Set consumer config to consume even if decryption fails
conf.setCryptoFailureAction(ConsumerCryptoFailureAction.CONSUME);
consumer.close();
consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/myenc-topic1", "my-subscriber-name",
conf);

int msgNum = 0;
try {
// Receive should proceed and deliver encrypted message
msg = consumer.receive(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
String expectedMessage = "my-message-" + msgNum++;
Assert.assertNotEquals(receivedMessage, expectedMessage,
"Received encrypted message " + receivedMessage + " should not match the expected message " + expectedMessage);
consumer.acknowledgeCumulative(msg);
} catch (Exception e) {
Assert.fail("Failed to receive message even aftet ConsumerCryptoFailureAction.CONSUME is set.");
}

// 5. Set keyreader and failure action
conf.setCryptoFailureAction(ConsumerCryptoFailureAction.FAIL);
consumer.close();
// Set keyreader
conf.setCryptoKeyReader(new EncKeyReader());
consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/myenc-topic1", "my-subscriber-name",
conf);

for (int i = msgNum; i < totalMsg-1; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();

//6. Set consumer config to discard if decryption fails
consumer.close();
ConsumerConfiguration conf2 = new ConsumerConfiguration();
conf2.setSubscriptionType(SubscriptionType.Exclusive);
conf2.setCryptoFailureAction(ConsumerCryptoFailureAction.DISCARD);
consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/myenc-topic1", "my-subscriber-name", conf2);

// Receive should proceed and discard encrypted messages
msg = consumer.receive(1, TimeUnit.SECONDS);
Assert.assertNull(msg, "Message received even aftet ConsumerCryptoFailureAction.DISCARD is set.");

log.info("-- Exiting {} test --", methodName);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-----BEGIN EC PARAMETERS-----
MIGXAgEBMBwGByqGSM49AQECEQD////9////////////////MDsEEP////3/////
//////////wEEOh1ecEQefQ92CSZPCzuXtMDFQAADg1NaW5naHVhUXUMwDpEc9A2
eQQhBBYf91KLiZstDChgfKUsW4bPWsg5W6/rE8AtopLd7XqDAhEA/////gAAAAB1
ow0bkDihFQIBAQ==
-----END EC PARAMETERS-----
-----BEGIN EC PRIVATE KEY-----
MIHYAgEBBBDeu9hc8kOvL3pl+LYSjLq9oIGaMIGXAgEBMBwGByqGSM49AQECEQD/
///9////////////////MDsEEP////3///////////////wEEOh1ecEQefQ92CSZ
PCzuXtMDFQAADg1NaW5naHVhUXUMwDpEc9A2eQQhBBYf91KLiZstDChgfKUsW4bP
Wsg5W6/rE8AtopLd7XqDAhEA/////gAAAAB1ow0bkDihFQIBAaEkAyIABOsqPpE8
cY80pxkog5xw3i2AQ0yfV3MqMusxlOQnigBp
-----END EC PRIVATE KEY-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-----BEGIN EC PARAMETERS-----
MIIBwgIBATBNBgcqhkjOPQEBAkIB////////////////////////////////////
//////////////////////////////////////////////////8wgZ4EQgH/////
////////////////////////////////////////////////////////////////
/////////////////ARBUZU+uWGOHJofkpohoLaFQO6i2nJbmbMV87i0iZGO8Qnh
Vhk5Uex+k3sWUsC9O7G/BzVz34g9LDTx70Uf1GtQPwADFQDQnogAKRy4U5bMZxc5
MoSqoNpkugSBhQQAxoWOBrcEBOnNnj7LZiOVtEKcZIE5BT+1Ifgor2BrTT26oUte
d+/nWSj+HcEnov+o3jNIs8GFakKb+X5+McLlvWYBGDkpaniaO8AEXIpftCx9G9mY
9URJV5tEaBevvRcnPmYsl+5ymV70JkDFULkBP60HYTU8cIaicsJAiL6Udp/RZlAC
QgH///////////////////////////////////////////pRhoeDvy+Wa3/MAUj3
CaXQO7XJuImcR667b7cekThkCQIBAQ==
-----END EC PARAMETERS-----
-----BEGIN EC PRIVATE KEY-----
MIICnQIBAQRCAeNLEp1HefZ1nMl5vvgFMsJCd5ieCWqPT7TXbQkn27A8WkyAGTYC
GtolyPokOgSjbJh+ofBt/MgvE/nMrqzmkZVtoIIBxjCCAcICAQEwTQYHKoZIzj0B
AQJCAf//////////////////////////////////////////////////////////
////////////////////////////MIGeBEIB////////////////////////////
//////////////////////////////////////////////////////////wEQVGV
PrlhjhyaH5KaIaC2hUDuotpyW5mzFfO4tImRjvEJ4VYZOVHsfpN7FlLAvTuxvwc1
c9+IPSw08e9FH9RrUD8AAxUA0J6IACkcuFOWzGcXOTKEqqDaZLoEgYUEAMaFjga3
BATpzZ4+y2YjlbRCnGSBOQU/tSH4KK9ga009uqFLXnfv51ko/h3BJ6L/qN4zSLPB
hWpCm/l+fjHC5b1mARg5KWp4mjvABFyKX7QsfRvZmPVESVebRGgXr70XJz5mLJfu
cple9CZAxVC5AT+tB2E1PHCGonLCQIi+lHaf0WZQAkIB////////////////////
///////////////////////6UYaHg78vlmt/zAFI9wml0Du1ybiJnEeuu2+3HpE4
ZAkCAQGhgYkDgYYABAFhUHeaHfIWre/pPmv2a2l891co79dFpg6ixPRg+Y5qe0C7
src//LT/ZR5rgj8ne+YcaIlwyQRl5OYEd25n799IcgHIBTGyaLB6Td5mW/oWT/Fz
soufOnUJ7O/kDHjIQ15sczk3rDhe8/mB9zPjKlKTuAl5jBEt6E3yiB44Dtng02xD
uQ==
-----END EC PRIVATE KEY-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAtKWwgqdnTYrOCv+j1MkTWfSH0wCsHZZca9wAW3qP4uuhlBvn
b10JcFf5ZjzP9BSXK+tHmI8uoN368vEv6yhURHM4yuXqzCxzuAwkQSo39rzX8PGC
7qdjCN7LDJ3MnqiBIrUsSaEP1wrNsB1kI+o9ER1e5O/uEPAotP933hHQ0J2hMEek
HqL7sBlJ98h6NmsicEaUkardk0TOXrlkjC+cMd8ZbGScPqI9M38bmn3OLxFTn1vt
hpvnXLvCmG4M+6xtYtD+npcVPZw1i1R90fMs7ppZnRbv8Hc/DFdOKVQIgam6CDdn
NKgW7c7IBMrP0AEm37HTu0LSOjP2OHXlvvlQGQIDAQABAoIBAAaJFAi2C7u3cNrf
AstY9vVDLoLIvHFZlkBktjKZDYmVIsRb+hSCViwVUrWLL67R6+Iv4eg4DeTOAx00
8pncXKgZTw2wIb1/QjR/Y/RjlaC8lkdmRWli7udMQCZVsyhuSjW6Pj7vr8YE4woj
FhNijxEGcf9wWrmMJrzdnTWQiXByo+eTvUQ9BPgPGrRjsMZmTkLyAVJff2DfxO5b
IWFDYDJcyYAMCIMQu7vys/I50ou6ilb1CO6QM6Z7KpPeOoVFPwtzbh8cf9xM8UNS
j6J/JmdWhgI34GS3NA68xTQ6PV7zjnhCc+iccm3JKyzGXwaApAZ+Eoce/9j4WKmu
5B4ziR0CgYEA3l/9OHbl1zmyV+rRxWOIj/i2rTvHzwBnbnPJyuemL5VMFdpGodQ3
vwHvyQmcECRVRxmXojQ4QuPPHs3qp6wEEFPCWxChLSTxlUc85SOFHWU2O99jV7zI
7+JOpDK/Mstsx9nHgXduJF+glTFtA3LH8Oqylzu2aFPsprwKuZf94Q8CgYEAz/Zx
akEG+PEMtP5YS28cX5XfjsIX/V26Fs6/sH16QjUIEddE5T4fCuokxCjSiwUcWhml
pHEJ5S5xp3VYRfISW3jRW3qstIH1tpZipB6+S0zTuJmLJbA3IiWEg2rtMt7X1uJv
A/bYOqe0hOPTuXuZdtVZ0nMTKk7GG8O6VkBI7FcCgYEAkDfCmscJgs7JahlBWHmX
zH9pwem+SPKjIc/4NB6N+dgikx2Pp05hpP/VihUwYIufvs/LNogVYNQrtHepUnrN
2+TmbHbZgNSv1Ldxt82UfB7y0FutKu6lhmXHyNecho3Fi8sih0V0aiSWmYuHfrAH
GaiskEZKo1iiZvQXJIx9O2MCgYATBf0r9hTYMtyxtc6H3/sdd01C9thQ8gDy0yjP
0Tqc0dMSJroDqmIWkoKYew9/bhFA4LW5TCnWkCAPbHmNtG4fdfbYwmkH/hdnA2y0
jKdlpfp8GXeUFAGHGx17FA3sqFvgKUh0eWEgRHUL7vdQMVFBgJS93o7zQM94fLgP
6cOB8wKBgFcGV4GjI2Ww9cillaC554MvoSjf8B/+04kXzDOh8iYIIzO9EUil1jjK
Jvxp4hnLzTKWbux3MEWqurLkYas6GpKBjw+iNOCar6YdqWGVqM3RUx7PTUaZwkKx
UdP63IfY7iZCIT/QbyHQvIUe2MaiVnH+ulxdkK6Y5e7gxcbckIH4
-----END RSA PRIVATE KEY-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-----BEGIN PUBLIC KEY-----
MIHKMIGjBgcqhkjOPQIBMIGXAgEBMBwGByqGSM49AQECEQD////9////////////
////MDsEEP////3///////////////wEEOh1ecEQefQ92CSZPCzuXtMDFQAADg1N
aW5naHVhUXUMwDpEc9A2eQQhBBYf91KLiZstDChgfKUsW4bPWsg5W6/rE8AtopLd
7XqDAhEA/////gAAAAB1ow0bkDihFQIBAQMiAATrKj6RPHGPNKcZKIOccN4tgENM
n1dzKjLrMZTkJ4oAaQ==
-----END PUBLIC KEY-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAtKWwgqdnTYrOCv+j1MkT
WfSH0wCsHZZca9wAW3qP4uuhlBvnb10JcFf5ZjzP9BSXK+tHmI8uoN368vEv6yhU
RHM4yuXqzCxzuAwkQSo39rzX8PGC7qdjCN7LDJ3MnqiBIrUsSaEP1wrNsB1kI+o9
ER1e5O/uEPAotP933hHQ0J2hMEekHqL7sBlJ98h6NmsicEaUkardk0TOXrlkjC+c
Md8ZbGScPqI9M38bmn3OLxFTn1vthpvnXLvCmG4M+6xtYtD+npcVPZw1i1R90fMs
7ppZnRbv8Hc/DFdOKVQIgam6CDdnNKgW7c7IBMrP0AEm37HTu0LSOjP2OHXlvvlQ
GQIDAQAB
-----END PUBLIC KEY-----
Loading