1
1
package com .scylladb .cdc .cql .driver3 ;
2
2
3
- import com .datastax .driver .core .Cluster ;
4
- import com .datastax .driver .core .ConsistencyLevel ;
5
- import com .datastax .driver .core .ProtocolVersion ;
6
- import com .datastax .driver .core .Session ;
3
+ import com .datastax .driver .core .*;
7
4
import com .datastax .driver .core .policies .DCAwareRoundRobinPolicy ;
8
5
import com .scylladb .cdc .cql .CQLConfiguration ;
6
+ import com .scylladb .cdc .cql .SslConfig ;
7
+ import io .netty .handler .ssl .SslContext ;
8
+ import io .netty .handler .ssl .SslContextBuilder ;
9
+
10
+ import javax .net .ssl .KeyManagerFactory ;
11
+ import javax .net .ssl .SSLException ;
12
+ import javax .net .ssl .TrustManagerFactory ;
13
+ import java .io .*;
14
+ import java .nio .file .Files ;
15
+ import java .nio .file .Paths ;
16
+ import java .security .KeyStore ;
17
+ import java .security .KeyStoreException ;
18
+ import java .security .NoSuchAlgorithmException ;
19
+ import java .security .UnrecoverableKeyException ;
20
+ import java .security .cert .CertificateException ;
9
21
10
22
public class Driver3Session implements AutoCloseable {
11
23
private final Cluster driverCluster ;
@@ -18,6 +30,71 @@ public Driver3Session(CQLConfiguration cqlConfiguration) {
18
30
19
31
clusterBuilder = clusterBuilder .addContactPointsWithPorts (cqlConfiguration .contactPoints );
20
32
33
+ if (cqlConfiguration .sslConfig != null ) {
34
+ SslConfig sslConfig = cqlConfiguration .sslConfig ;
35
+ final SslContextBuilder sslContextBuilder = SslContextBuilder .forClient ();
36
+ System .out .println (sslConfig .getClass ().getProtectionDomain ().getCodeSource ().getLocation ());
37
+ sslContextBuilder .sslProvider (sslConfig .sslProvider );
38
+ if (sslConfig .trustStorePath != null ) {
39
+ final KeyStore trustKeyStore = createKeyStore (sslConfig .trustStorePath , sslConfig .trustStorePassword );
40
+
41
+ final TrustManagerFactory trustManagerFactory ;
42
+ try {
43
+ trustManagerFactory =
44
+ TrustManagerFactory .getInstance (TrustManagerFactory .getDefaultAlgorithm ());
45
+ trustManagerFactory .init (trustKeyStore );
46
+ } catch (NoSuchAlgorithmException e ) {
47
+ throw new RuntimeException ("Exception while creating TrustManagerFactory" , e );
48
+ } catch (KeyStoreException e ) {
49
+ throw new RuntimeException ("Exception while calling TrustManagerFactory.init()" , e );
50
+ }
51
+ sslContextBuilder .trustManager (trustManagerFactory );
52
+ }
53
+
54
+ if (sslConfig .keyStorePath != null ) {
55
+ final KeyStore keyStore = createKeyStore (sslConfig .keyStorePath , sslConfig .keyStorePassword );
56
+
57
+ final KeyManagerFactory keyManagerFactory ;
58
+ try {
59
+ keyManagerFactory = KeyManagerFactory .getInstance (KeyManagerFactory .getDefaultAlgorithm ());
60
+ keyManagerFactory .init (keyStore , sslConfig .keyStorePassword .toCharArray ());
61
+ } catch (NoSuchAlgorithmException e ) {
62
+ throw new RuntimeException ("Exception while creating KeyManagerFactory" , e );
63
+ } catch (UnrecoverableKeyException | KeyStoreException e ) {
64
+ throw new RuntimeException ("Exception while calling KeyManagerFactory.init()" , e );
65
+ }
66
+ sslContextBuilder .keyManager (keyManagerFactory );
67
+ }
68
+
69
+ if (sslConfig .cipherSuites .size () > 0 ) {
70
+ sslContextBuilder .ciphers (sslConfig .cipherSuites );
71
+ }
72
+
73
+ if (sslConfig .certPath != null && sslConfig .privateKeyPath != null ) {
74
+ try {
75
+ sslContextBuilder .keyManager (new BufferedInputStream (new FileInputStream (sslConfig .certPath )),
76
+ new BufferedInputStream (new FileInputStream (sslConfig .privateKeyPath )));
77
+ } catch (IllegalArgumentException e ) {
78
+ throw new RuntimeException (String .format ("Invalid certificate or private key: %s" , e .getMessage ()));
79
+ } catch (FileNotFoundException e ) {
80
+ throw new RuntimeException ("Invalid certificate or private key file path" , e );
81
+ }
82
+ } else if ((sslConfig .certPath == null ) != (sslConfig .privateKeyPath == null )) {
83
+ throw new RuntimeException (String .format ("%s cannot be set without %s and vice-versa: %s is not set" ,
84
+ "scylla.ssl.openssl.keyCertChain" , "scylla.ssl.openssl.privateKey" ,
85
+ (sslConfig .certPath == null ) ? "scylla.ssl.openssl.keyCertChain" : "scylla.ssl.openssl.privateKey" ));
86
+ }
87
+
88
+ final SslContext context ;
89
+ try {
90
+ context = sslContextBuilder .build ();
91
+ } catch (SSLException e ) {
92
+ throw new RuntimeException (e );
93
+ }
94
+ final SSLOptions sslOptions = new RemoteEndpointAwareNettySSLOptions (context );
95
+ clusterBuilder .withSSL (sslOptions );
96
+ }
97
+
21
98
// Deliberately set the protocol version to V4,
22
99
// as V5 implements returning a metadata id (schema id)
23
100
// per each page. Our implementation of Driver3WorkerCQL
@@ -87,4 +164,21 @@ public void close() {
87
164
driverCluster .close ();
88
165
}
89
166
}
167
+
168
+ private KeyStore createKeyStore (String path , String password ) {
169
+ KeyStore keyStore ;
170
+ try {
171
+ keyStore = KeyStore .getInstance ("JKS" );
172
+ try (InputStream inputStream = Files .newInputStream (Paths .get (path ))) {
173
+ keyStore .load (inputStream , password .toCharArray ());
174
+ } catch (IOException e ) {
175
+ throw new RuntimeException ("Exception while reading keystore" , e );
176
+ } catch (CertificateException | NoSuchAlgorithmException e ) {
177
+ throw new RuntimeException ("Exception while loading keystore" , e );
178
+ }
179
+ } catch (KeyStoreException e ) {
180
+ throw new RuntimeException ("Exception while creating keystore" , e );
181
+ }
182
+ return keyStore ;
183
+ }
90
184
}
0 commit comments