Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions v2/sourcedb-to-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,5 +158,11 @@
<version>4.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.60</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,46 @@
import com.github.nosan.embedded.cassandra.commons.ClassPathResource;
import com.github.nosan.embedded.cassandra.cql.CqlScript;
import com.google.common.collect.ImmutableList;
import java.io.FileOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.Security;
import java.security.cert.X509Certificate;
import java.util.Date;
import javax.annotation.Nullable;
import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.bouncycastle.asn1.x500.X500Name;
import org.bouncycastle.asn1.x509.BasicConstraints;
import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
import org.bouncycastle.cert.X509CertificateHolder;
import org.bouncycastle.cert.X509v3CertificateBuilder;
import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.operator.ContentSigner;
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;

/**
* Utility Class to start and stop Embedded Cassandra. {@link Cassandra Embedded Cassandra} is
* equivalent to real cassandra at the level of network protocol. So using this over mocks wherever
* possible gives us much better test coverage. Note: Prefer using {@link SharedEmbeddedCassandra}
* to share an instance of Embedded Cassandra.
*
* <p>Note on SSL Mode:
*
* <p>When the test Cassandra Server has to run with SSL enabled, it needs to present an SSL
* certificate to the client, which the client can verify. In a UT environment, we won't have a
* certificate authority that will sign the certificates. For this, We can either check in a private
* Key and Cert to the repo itself which is used in UT, which is less ideal, or, We can generate a
* temporary random key and certificate which would be used by the server and trusted by the client
* in a UT setting. We are taking the later route in order to avoid having to check in keys and
* certificates to the repo.
*/
public class EmbeddedCassandra implements AutoCloseable {
private Cassandra embeddedCassandra;
Expand All @@ -40,23 +70,48 @@ public class EmbeddedCassandra implements AutoCloseable {
private final Settings settings;
private static final String LOCAL_DATA_CENTER = "datacenter1";

public EmbeddedCassandra(String config, @Nullable String cqlResource) throws IOException {
/** Temporary file for storing the certificate key. */
private java.io.File keyStoreFile = null;

/** Temporary file for storing the certificate. */
private java.io.File trustStoreFile = null;

public EmbeddedCassandra(String config, @Nullable String cqlResource, boolean clientEncryption)
throws IOException {
var builder =
new CassandraBuilder()
.addEnvironmentVariable("JAVA_HOME", System.getProperty("java.home"))
.addEnvironmentVariable("JRE_HOME", System.getProperty("jre.home"))
// Check [CASSANDRA-13396](https://issues.apache.org/jira/browse/CASSANDRA-13396)
.addSystemProperty("cassandra.insecure.udf", "true")
.configFile(new ClassPathResource(config));
.configFile(new ClassPathResource(config))
// Choose from available ports on the test machine.
.addConfigProperty("native_transport_port", 0)
.addConfigProperty("storage_port", 0)
.addSystemProperty("cassandra.jmx.local.port", 0)
.registerShutdownHook(true);
if (clientEncryption) {

// Generate temporary keystore and truststore files
keyStoreFile = java.io.File.createTempFile("client", ".keystore");
trustStoreFile = java.io.File.createTempFile("client", ".truststore");
builder =
builder
.addConfigProperty("client_encryption_options.enabled", true)
.addConfigProperty("client_encryption_options.optional", true)
.addConfigProperty(
"client_encryption_options.keystore", keyStoreFile.getAbsolutePath());
createTemporaryKeyStore(keyStoreFile, trustStoreFile);
}
// Ref: https://stackoverflow.com/questions/78195798/embedded-cassandra-not-working-in-java-21
if (Runtime.version().compareTo(Runtime.Version.parse("12")) >= 0) {
builder = builder.addSystemProperty("java.security.manager", "allow");
}
/*
* TODO (vardhanvthigle): Get EmbeddedCassandea 4.0 working with our UT JVM.
* TODO (vardhanvthigle): Get EmbeddedCassandra 4.0 working with our UT JVM.
// If we spawn Cassandra 4.0.0 for testing, it tries to set biased locking, which is not recognized by some JVMs.
builder = builder.addJvmOptions("-XX:+IgnoreUnrecognizedVMOptions");
// This is needed as Cassnadra 4.0 goes for deep reflections for java pacakges.
// This is needed as Cassandra 4.0 goes for deep reflections for java packages.
builder = builder.addEnvironmentVariable("JDK_JAVA_OPTIONS", "--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED"
+ "--add-opens java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED");
builder = builder.version("4.0.15");
Expand All @@ -78,6 +133,91 @@ public EmbeddedCassandra(String config, @Nullable String cqlResource) throws IOE
}
}

/** Generate a Random KeyPair for Signing the SSL certificate in UT environment. */
private static KeyPair generateTestKeyPair() throws NoSuchAlgorithmException {
KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
keyPairGenerator.initialize(2048);
return keyPairGenerator.generateKeyPair();
}

/** Generate a random Key Pair and a Self Signed Certificate for the UT environment. */
private static void createTemporaryKeyStore(
java.io.File keyStoreFile, java.io.File trustStoreFile) {
Security.addProvider(new BouncyCastleProvider());

try {
// Generate KeyPair
KeyPair keyPair = generateTestKeyPair();

// Generate Certificate
X509Certificate certificate = generateTestCertificate(keyPair);

// Create and save keystore
createKeyStore(keyStoreFile, keyPair, certificate, "cassandra".toCharArray());

// Create and save truststore
createTrustStore(trustStoreFile, certificate, "cassandra".toCharArray());

} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static void createKeyStore(
java.io.File keyStoreFile, KeyPair keyPair, X509Certificate certificate, char[] password)
throws Exception {
KeyStore keyStore = KeyStore.getInstance("JKS");
keyStore.load(null, null);
keyStore.setKeyEntry(
"client",
keyPair.getPrivate(),
password,
new java.security.cert.Certificate[] {certificate});
try (FileOutputStream fos = new FileOutputStream(keyStoreFile)) {
keyStore.store(fos, password);
}
}

private static void createTrustStore(
java.io.File trustStoreFile, X509Certificate certificate, char[] password) throws Exception {
KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(null, null);
trustStore.setCertificateEntry("localhost", certificate);
try (FileOutputStream fos = new FileOutputStream(trustStoreFile)) {
trustStore.store(fos, password);
}
}

/** Generate a selfsigned test certificate. */
private static X509Certificate generateTestCertificate(KeyPair keyPair) throws Exception {
// Prepare necessary information
X500Name issuer = new X500Name("CN=localhost");
BigInteger serial = new BigInteger(160, new SecureRandom());
Date notBefore = new Date();
Date notAfter = new Date(notBefore.getTime() + 365 * 24 * 60 * 60 * 1000L); // 1 year validity
X500Name subject = issuer;
SubjectPublicKeyInfo publicKeyInfo =
SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded());

// Create certificate builder
X509v3CertificateBuilder certBuilder =
new X509v3CertificateBuilder(issuer, serial, notBefore, notAfter, subject, publicKeyInfo);

// Add Basic Constraints (optional, for CA certificates)
certBuilder.addExtension(
org.bouncycastle.asn1.x509.Extension.basicConstraints, true, new BasicConstraints(true));

// Create content signer
ContentSigner contentSigner =
new JcaContentSignerBuilder("SHA256WithRSAEncryption").build(keyPair.getPrivate());

// Build the certificate holder
X509CertificateHolder certHolder = certBuilder.build(contentSigner);

// Convert to X509Certificate
return new JcaX509CertificateConverter().getCertificate(certHolder);
}

public Cassandra getEmbeddedCassandra() {
return embeddedCassandra;
}
Expand All @@ -98,10 +238,27 @@ public ImmutableList<InetSocketAddress> getContactPoints() {
return this.contactPoints;
}

public Path getKeyStorePath() {
return this.keyStoreFile.toPath();
}

public Path getTrustStorePath() {
return this.trustStoreFile.toPath();
}

@Override
public void close() throws Exception {
if (embeddedCassandra != null) {
embeddedCassandra.stop();
}

if (keyStoreFile != null && keyStoreFile.exists()) {
keyStoreFile.delete();
keyStoreFile = null;
}
if (trustStoreFile != null && trustStoreFile.exists()) {
trustStoreFile.delete();
trustStoreFile = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,19 @@ public class SharedEmbeddedCassandra implements AutoCloseable {
*
* @param config - config.yaml
* @param cqlResource - cql script.
* @param clientEncryption - set to true if Client side SSL is needed.
* @throws IOException
*/
public SharedEmbeddedCassandra(String config, @Nullable String cqlResource) throws IOException {
this.config = Configuration.create(config, cqlResource);
public SharedEmbeddedCassandra(
String config, @Nullable String cqlResource, Boolean clientEncryption) throws IOException {
this.config = Configuration.create(config, cqlResource, clientEncryption);
this.embeddedCassandra = getEmbeddedCassandra(this.config);
}

public SharedEmbeddedCassandra(String config, @Nullable String cqlResource) throws IOException {
this(config, cqlResource, Boolean.FALSE);
}

/**
* Get a reference to {@link com.github.nosan.embedded.cassandra.Cassandra Embedded Cassandra}
* managed by {@link SharedEmbeddedCassandra}.
Expand Down Expand Up @@ -90,7 +96,10 @@ private static EmbeddedCassandra getEmbeddedCassandra(Configuration configuratio
} else {
Log.info("Starting Shared embedded Cassandra for configuration = {}", configuration);
embeddedCassandra =
new EmbeddedCassandra(configuration.configYaml(), configuration.cqlScript());
new EmbeddedCassandra(
configuration.configYaml(),
configuration.cqlScript(),
configuration.clientEncryption());
RefCountedEmbeddedCassandra refCountedEmbeddedCassandra =
RefCountedEmbeddedCassandra.create(embeddedCassandra);
refCountedEmbeddedCassandra.refIncrementAndGet();
Expand Down Expand Up @@ -123,15 +132,20 @@ private static void putEmbeddedCassandra(Configuration configuration) throws Exc
abstract static class Configuration {
public AtomicInteger refCount = new AtomicInteger();

public static Configuration create(String configYaml, String cqlScript) {
return new AutoValue_SharedEmbeddedCassandra_Configuration(configYaml, cqlScript);
public static Configuration create(
String configYaml, String cqlScript, Boolean clientEncryption) {
return new AutoValue_SharedEmbeddedCassandra_Configuration(
configYaml, cqlScript, clientEncryption);
}

@Nullable
public abstract String configYaml();

@Nullable
public abstract String cqlScript();

@Nullable
public abstract Boolean clientEncryption();
}

// This is a private class, and it must be ensured that refcounting is synchronized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ seed_provider:
parameters:
# seeds is actually a comma-delimited list of addresses.
# Ex: "<ip1>,<ip2>,<ip3>"
- seeds: "127.0.0.1:7000"
- seeds: "127.0.0.1"

# For workloads with more data than can fit in memory, Cassandra's
# bottleneck will be reads that need to fetch data from
Expand Down
Loading