Skip to content

Commit 9dc0c0b

Browse files
authored
Merge pull request #350 from zigarn/truststore-type
Handle other type of truststore
2 parents 9751f13 + 30625e3 commit 9dc0c0b

File tree

9 files changed

+54
-9
lines changed

9 files changed

+54
-9
lines changed

src/main/java/com/splunk/hecclient/Hec.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
287287
}
288288

289289
// Code block for custom keystore client construction
290-
SSLContext context = loadCustomSSLContext(config.getTrustStorePath(), config.getTrustStorePassword());
290+
SSLContext context = loadCustomSSLContext(config.getTrustStorePath(), config.getTrustStoreType(), config.getTrustStorePassword());
291291

292292
if (context != null) {
293293
return new HttpClientBuilder()
@@ -309,16 +309,17 @@ public static CloseableHttpClient createHttpClient(final HecConfig config) {
309309
* a Hec Client with custom key store functionality.
310310
*
311311
* @param path A file path to the custom key store to be used.
312+
* @param type The type of the key store file.
312313
* @param pass The password for the key store file.
313314
* @since 1.1.0
314315
* @throws HecException
315316
* @return A configured SSLContect to be used in a CloseableHttpClient
316317
* @see KeyStore
317318
* @see SSLContext
318319
*/
319-
public static SSLContext loadCustomSSLContext(String path, String pass) {
320+
public static SSLContext loadCustomSSLContext(String path, String type, String pass) {
320321
try {
321-
KeyStore ks = KeyStore.getInstance("JKS");
322+
KeyStore ks = KeyStore.getInstance(type);
322323
FileInputStream fileInputStream = new FileInputStream(path);
323324
ks.load(fileInputStream, pass.toCharArray());
324325

src/main/java/com/splunk/hecclient/HecConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public final class HecConfig {
3434
private boolean enableChannelTracking = false;
3535
private boolean hasCustomTrustStore = false;
3636
private String trustStorePath;
37+
private String trustStoreType = "JKS";
3738
private String trustStorePassword;
3839
private int lbPollInterval = 120; // in seconds
3940
private String kerberosPrincipal;
@@ -104,6 +105,8 @@ public int getBackoffThresholdSeconds() {
104105

105106
public String getTrustStorePath() { return trustStorePath; }
106107

108+
public String getTrustStoreType() { return trustStoreType; }
109+
107110
public String getTrustStorePassword() { return trustStorePassword; }
108111

109112
public HecConfig setDisableSSLCertVerification(boolean disableVerfication) {
@@ -161,6 +164,11 @@ public HecConfig setTrustStorePath(String path) {
161164
return this;
162165
}
163166

167+
public HecConfig setTrustStoreType(String type) {
168+
trustStoreType = type;
169+
return this;
170+
}
171+
164172
public HecConfig setTrustStorePassword(String pass) {
165173
trustStorePassword = pass;
166174
return this;

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
7474
static final String HEC_EVENT_FORMATTED_CONF = "splunk.hec.json.event.formatted";
7575
// Trust store
7676
static final String SSL_TRUSTSTORE_PATH_CONF = "splunk.hec.ssl.trust.store.path";
77+
static final String SSL_TRUSTSTORE_TYPE_CONF = "splunk.hec.ssl.trust.store.type";
7778
static final String SSL_TRUSTSTORE_PASSWORD_CONF = "splunk.hec.ssl.trust.store.password";
7879
//Headers
7980
static final String HEADER_SUPPORT_CONF = "splunk.header.support";
@@ -178,6 +179,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
178179
+ "correctly by Splunk.";
179180
// TBD
180181
static final String SSL_TRUSTSTORE_PATH_DOC = "Path on the local disk to the certificate trust store.";
182+
static final String SSL_TRUSTSTORE_TYPE_DOC = "Type of the trust store (JKS, PKCS12, ...).";
181183
static final String SSL_TRUSTSTORE_PASSWORD_DOC = "Password for the trust store.";
182184

183185
static final String HEADER_SUPPORT_DOC = "Setting will enable Kafka Record headers to be used for meta data override";
@@ -236,6 +238,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
236238

237239
final boolean hasTrustStorePath;
238240
final String trustStorePath;
241+
final String trustStoreType;
239242
final String trustStorePassword;
240243

241244
final boolean headerSupport;
@@ -265,6 +268,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
265268
validateCertificates = getBoolean(SSL_VALIDATE_CERTIFICATES_CONF);
266269
trustStorePath = getString(SSL_TRUSTSTORE_PATH_CONF);
267270
hasTrustStorePath = StringUtils.isNotBlank(trustStorePath);
271+
trustStoreType = getString(SSL_TRUSTSTORE_TYPE_CONF);
268272
trustStorePassword = getPassword(SSL_TRUSTSTORE_PASSWORD_CONF).value();
269273
validateHttpsConfig(splunkURI);
270274
eventBatchTimeout = getInt(EVENT_TIMEOUT_CONF);
@@ -318,6 +322,7 @@ public static ConfigDef conf() {
318322
.define(HTTP_KEEPALIVE_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, HTTP_KEEPALIVE_DOC)
319323
.define(SSL_VALIDATE_CERTIFICATES_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, SSL_VALIDATE_CERTIFICATES_DOC)
320324
.define(SSL_TRUSTSTORE_PATH_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PATH_DOC)
325+
.define(SSL_TRUSTSTORE_TYPE_CONF, ConfigDef.Type.STRING, "JKS", ConfigDef.Importance.LOW, SSL_TRUSTSTORE_TYPE_DOC)
321326
.define(SSL_TRUSTSTORE_PASSWORD_CONF, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SSL_TRUSTSTORE_PASSWORD_DOC)
322327
.define(EVENT_TIMEOUT_CONF, ConfigDef.Type.INT, 300, ConfigDef.Importance.MEDIUM, EVENT_TIMEOUT_DOC)
323328
.define(ACK_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 10, ConfigDef.Importance.MEDIUM, ACK_POLL_INTERVAL_DOC)
@@ -368,6 +373,7 @@ public HecConfig getHecConfig() {
368373
.setEnableChannelTracking(trackData)
369374
.setBackoffThresholdSeconds(backoffThresholdSeconds)
370375
.setTrustStorePath(trustStorePath)
376+
.setTrustStoreType(trustStoreType)
371377
.setTrustStorePassword(trustStorePassword)
372378
.setHasCustomTrustStore(hasTrustStorePath)
373379
.setKerberosPrincipal(kerberosUserPrincipal)
@@ -393,6 +399,7 @@ public String toString() {
393399
+ "httpKeepAlive:" + httpKeepAlive + ", "
394400
+ "validateCertificates:" + validateCertificates + ", "
395401
+ "trustStorePath:" + trustStorePath + ", "
402+
+ "trustStoreType:" + trustStoreType + ", "
396403
+ "socketTimeout:" + socketTimeout + ", "
397404
+ "eventBatchTimeout:" + eventBatchTimeout + ", "
398405
+ "ackPollInterval:" + ackPollInterval + ", "
@@ -544,4 +551,4 @@ private static boolean getNamedGroupCandidates(String regex) {
544551
}
545552
return false;
546553
}
547-
}
554+
}

src/test/java/com/splunk/hecclient/HecConfigTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public void getterSetter() {
4444
.setEnableChannelTracking(true)
4545
.setEventBatchTimeout(7)
4646
.setTrustStorePath("test")
47+
.setTrustStoreType("PKCS12")
4748
.setTrustStorePassword("pass")
4849
.setHasCustomTrustStore(true)
4950
.setBackoffThresholdSeconds(10)
@@ -60,6 +61,7 @@ public void getterSetter() {
6061
Assert.assertEquals(6, config.getAckPollThreads());
6162
Assert.assertEquals(7, config.getEventBatchTimeout());
6263
Assert.assertEquals("test", config.getTrustStorePath());
64+
Assert.assertEquals("PKCS12", config.getTrustStoreType());
6365
Assert.assertEquals("pass", config.getTrustStorePassword());
6466
Assert.assertEquals(10000, config.getBackoffThresholdSeconds());
6567
Assert.assertEquals(120000, config.getlbPollInterval());

src/test/java/com/splunk/hecclient/HttpClientBuilderTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,19 @@ public void buildSecureCustomKeystore() {
5252
.setSocketSendBufferSize(1024)
5353
.setSocketTimeout(120)
5454
.setDisableSSLCertVerification(false)
55-
.setSslContext(Hec.loadCustomSSLContext("./src/test/resources/keystoretest.jks","Notchangeme"))
55+
.setSslContext(Hec.loadCustomSSLContext("./src/test/resources/keystoretest.jks", "JKS", "Notchangeme"))
56+
.build();
57+
Assert.assertNotNull(client);
58+
}
59+
@Test
60+
public void buildSecureCustomKeystorePkcs12() {
61+
HttpClientBuilder builder = new HttpClientBuilder();
62+
CloseableHttpClient client = builder.setMaxConnectionPoolSizePerDestination(1)
63+
.setMaxConnectionPoolSize(2)
64+
.setSocketSendBufferSize(1024)
65+
.setSocketTimeout(120)
66+
.setDisableSSLCertVerification(false)
67+
.setSslContext(Hec.loadCustomSSLContext("./src/test/resources/keystoretest.p12", "PKCS12", "Notchangeme"))
5668
.build();
5769
Assert.assertNotNull(client);
5870
}
@@ -63,4 +75,4 @@ public void buildDefault() {
6375
CloseableHttpClient client = builder.build();
6476
Assert.assertNotNull(client);
6577
}
66-
}
78+
}

src/test/java/com/splunk/kafka/connect/ConfigProfile.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class ConfigProfile {
1717
private boolean validateCertificates;
1818
private boolean hasTrustStorePath;
1919
private String trustStorePath;
20+
private String trustStoreType;
2021
private String trustStorePassword;
2122
private int eventBatchTimeout;
2223
private int ackPollInterval;
@@ -77,6 +78,7 @@ public ConfigProfile buildProfileDefault() {
7778
this.validateCertificates = true;
7879
this.hasTrustStorePath = true;
7980
this.trustStorePath = "./src/test/resources/keystoretest.jks";
81+
this.trustStoreType = "JKS";
8082
this.trustStorePassword = "Notchangeme";
8183
this.eventBatchTimeout = 1;
8284
this.ackPollInterval = 1;
@@ -110,7 +112,8 @@ public ConfigProfile buildProfileOne() {
110112
this.httpKeepAlive = true;
111113
this.validateCertificates = true;
112114
this.hasTrustStorePath = true;
113-
this.trustStorePath = "./src/test/resources/keystoretest.jks";
115+
this.trustStorePath = "./src/test/resources/keystoretest.p12";
116+
this.trustStoreType = "PKCS12";
114117
this.trustStorePassword = "Notchangeme";
115118
this.eventBatchTimeout = 1;
116119
this.ackPollInterval = 1;
@@ -332,6 +335,14 @@ public void setTrustStorePath(String trustStorePath) {
332335
this.trustStorePath = trustStorePath;
333336
}
334337

338+
public String getTrustStoreType() {
339+
return trustStoreType;
340+
}
341+
342+
public void setTrustStoreType(String trustStoreType) {
343+
this.trustStoreType = trustStoreType;
344+
}
345+
335346
public String getTrustStorePassword() {
336347
return trustStorePassword;
337348
}
@@ -461,6 +472,6 @@ public void setHeaderHost(String headerHost) {
461472
}
462473

463474
@Override public String toString() {
464-
return "ConfigProfile{" + "topics='" + topics + '\'' + ", topics.regex='" + topicsRegex + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}';
475+
return "ConfigProfile{" + "topics='" + topics + '\'' + ", topics.regex='" + topicsRegex + '\'' + ", token='" + token + '\'' + ", uri='" + uri + '\'' + ", raw=" + raw + ", ack=" + ack + ", indexes='" + indexes + '\'' + ", sourcetypes='" + sourcetypes + '\'' + ", sources='" + sources + '\'' + ", httpKeepAlive=" + httpKeepAlive + ", validateCertificates=" + validateCertificates + ", hasTrustStorePath=" + hasTrustStorePath + ", trustStorePath='" + trustStorePath + '\'' + ", trustStoreType='" + trustStoreType + '\'' + ", trustStorePassword='" + trustStorePassword + '\'' + ", eventBatchTimeout=" + eventBatchTimeout + ", ackPollInterval=" + ackPollInterval + ", ackPollThreads=" + ackPollThreads + ", maxHttpConnPerChannel=" + maxHttpConnPerChannel + ", totalHecChannels=" + totalHecChannels + ", socketTimeout=" + socketTimeout + ", enrichements='" + enrichements + '\'' + ", enrichementMap=" + enrichementMap + ", trackData=" + trackData + ", maxBatchSize=" + maxBatchSize + ", numOfThreads=" + numOfThreads + '}';
465476
}
466477
}

src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public void getHecConfigCustomKeystore() {
8383
HecConfig config = connectorConfig.getHecConfig();
8484
Assert.assertEquals(true, config.getHasCustomTrustStore());
8585
Assert.assertEquals(uu.configProfile.getTrustStorePath(), config.getTrustStorePath());
86+
Assert.assertEquals(uu.configProfile.getTrustStoreType(), config.getTrustStoreType());
8687
Assert.assertEquals(uu.configProfile.getTrustStorePassword(), config.getTrustStorePassword());
8788
}
8889

@@ -95,9 +96,10 @@ public void testCustomKeystore() throws KeyStoreException {
9596
HecConfig config = connectorConfig.getHecConfig();
9697
Assert.assertEquals(true, config.getHasCustomTrustStore());
9798
Assert.assertEquals(uu.configProfile.getTrustStorePath(), config.getTrustStorePath());
99+
Assert.assertEquals(uu.configProfile.getTrustStoreType(), config.getTrustStoreType());
98100
Assert.assertEquals(uu.configProfile.getTrustStorePassword(), config.getTrustStorePassword());
99101

100-
SSLContext context = Hec.loadCustomSSLContext(config.getTrustStorePath(),config.getTrustStorePassword());
102+
SSLContext context = Hec.loadCustomSSLContext(config.getTrustStorePath(), config.getTrustStoreType(), config.getTrustStorePassword());
101103
Assert.assertNotNull(context);
102104

103105
}
@@ -315,6 +317,7 @@ private void commonAssert(final SplunkSinkConnectorConfig connectorConfig) {
315317
Assert.assertEquals(uu.configProfile.isHttpKeepAlive(), connectorConfig.httpKeepAlive);
316318
Assert.assertEquals(uu.configProfile.isValidateCertificates(), connectorConfig.validateCertificates);
317319
Assert.assertEquals(uu.configProfile.getTrustStorePath(), connectorConfig.trustStorePath);
320+
Assert.assertEquals(uu.configProfile.getTrustStoreType(), connectorConfig.trustStoreType);
318321
Assert.assertEquals(uu.configProfile.getTrustStorePassword(), connectorConfig.trustStorePassword);
319322
Assert.assertEquals(uu.configProfile.getEventBatchTimeout(), connectorConfig.eventBatchTimeout);
320323
Assert.assertEquals(uu.configProfile.getAckPollInterval(), connectorConfig.ackPollInterval);

src/test/java/com/splunk/kafka/connect/UnitUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public Map<String, String> createTaskConfig() {
4545

4646
if(configProfile.getTrustStorePath() != null ) {
4747
config.put(SplunkSinkConnectorConfig.SSL_TRUSTSTORE_PATH_CONF, configProfile.getTrustStorePath());
48+
config.put(SplunkSinkConnectorConfig.SSL_TRUSTSTORE_TYPE_CONF, configProfile.getTrustStoreType());
4849
config.put(SplunkSinkConnectorConfig.SSL_TRUSTSTORE_PASSWORD_CONF, configProfile.getTrustStorePassword());
4950
}
5051

src/test/resources/keystoretest.p12

2.66 KB
Binary file not shown.

0 commit comments

Comments
 (0)