Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions v2/spanner-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.17.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>3.12.4</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.migrations.utils;

import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigException;
import java.io.FileNotFoundException;
import java.net.URL;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A common static utility class that allows the spanner migration pipelines to ingest Cassandra
* Driver config file from GCS. Cassandra has a structured config file to accept all the driver
* parameters, be it list of host ip addresses, credentials, retry policy and many more. Most of
* these parameters are very specific to the Cassandra Database. Refer to the <a
* href=>https://docs.datastax.com/en/developer/java-driver/4.3/manual/core/configuration/reference/index.html>reference
* configuration</a> for the file format.
*/
public final class CassandraDriverConfigLoader {

private static final Logger LOG = LoggerFactory.getLogger(CassandraDriverConfigLoader.class);
private static final ImmutableMap<String, TypedDriverOption> OPTIONS_SUPPORTED_BY_DRIVER =
getOptionsSupportedByDriver();

/**
* Load the Cassandra Config from a file as a {@link DriverConfigLoader}.
*
* @param path A complete gcs path to the config file of the form "gs://path/to/file".
* @return DriverConfigLoader.
* @throws FileNotFoundException - If file is not found at specified path.
*/
public static DriverConfigLoader loadFile(String path) throws FileNotFoundException {
URL url = loadSingleFile(path);
LOG.debug("Loaded Cassandra Driver config from path {}", path);
try {
DriverConfigLoader.fromUrl(url).getInitialConfig();
return DriverConfigLoader.fromUrl(url);
} catch (ConfigException.Parse parseException) {
LOG.error(
"Parsing error while parsing Cassandra Driver config from path {}", path, parseException);
throw parseException;
}
}

/**
* Load the Cassandra Config from a file as a {@link java.io.Serializable} {@link OptionsMap}.
* This {@link OptionsMap} can be stored in any object that needs to implement {@link
* java.io.Serializable}. At the time of opening a connection to Cassandra, it can be deserialized
* by {@link CassandraDriverConfigLoader#fromOptionsMap(OptionsMap)}. Note: Implementation Detail,
* Cassandra Driver does not provide a direct method to convert a link {@link DriverConfigLoader}
* into an {@link OptionsMap}, or build an {@link OptionsMap} from a file.
*
* @param path A complete gcs path to the config file of the form "gs://path/to/file".
* @return DriverConfigLoader.
* @throws FileNotFoundException - If file is not found at specified path.
*/
public static OptionsMap getOptionsMapFromFile(String path) throws FileNotFoundException {
OptionsMap optionsMap = new OptionsMap();
DriverConfigLoader configLoader = loadFile(path);
configLoader
.getInitialConfig()
.getProfiles()
.forEach(
(profileName, profile) ->
profile.entrySet().forEach(e -> putInOptionsMap(optionsMap, profileName, e)));

return optionsMap;
}

/**
* Load the {@link DriverConfigLoader} from {@link java.io.Serializable} {@link OptionsMap} which
* was obtained as a part of {@link CassandraDriverConfigLoader#getOptionsMapFromFile(String)}.
*
* @param optionsMap
* @return DriverConfigLoader.
*/
public static DriverConfigLoader fromOptionsMap(OptionsMap optionsMap) {
return DriverConfigLoader.fromMap(optionsMap);
}

@VisibleForTesting
protected static URL loadSingleFile(String path) throws FileNotFoundException {
URL[] urls = JarFileReader.saveFilesLocally(path);
if (urls.length == 0) {
LOG.error("Could not load any Cassandra driver config file from specified path {}", path);
throw (new FileNotFoundException("No file found in path " + path));
}
if (urls.length > 1) {
LOG.error(
"Need to provide a single Cassandra driver config file in the specified path {}. Found {} ",
path,
urls);
throw (new IllegalArgumentException(
String.format(
"Need to provide a single Cassandra driver config file in the specified path %s. Found %d files",
path, urls.length)));
}
return urls[0];
}

@VisibleForTesting
protected static void putInOptionsMap(
OptionsMap optionsMap, String profileName, Entry<String, Object> e) {

TypedDriverOption option = OPTIONS_SUPPORTED_BY_DRIVER.get(e.getKey());
if (Objects.equal(option, null)) {
LOG.error(
"Unknown Cassandra Option {}, Options supported by driver = {}",
e.getKey(),
OPTIONS_SUPPORTED_BY_DRIVER);
throw new IllegalArgumentException(
String.format(
"Unknown Cassandra Driver Option %s. Supported Options = %s",
e.getKey(), OPTIONS_SUPPORTED_BY_DRIVER));
}
optionsMap.put(profileName, option, e.getValue());
}

private static ImmutableMap<String, TypedDriverOption> getOptionsSupportedByDriver() {
ImmutableMap.Builder<String, TypedDriverOption> mapBuilder = ImmutableMap.builder();
TypedDriverOption.builtInValues().forEach(e -> mapBuilder.put(e.getRawOption().getPath(), e));
return mapBuilder.build();
}

private CassandraDriverConfigLoader() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** TODO: rename this to FileReader. */
public class JarFileReader {

private static final Logger LOG = LoggerFactory.getLogger(JarFileReader.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.migrations.utils;

import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONTACT_POINTS;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.RETRY_POLICY_CLASS;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mockStatic;

import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import com.typesafe.config.ConfigException;
import java.io.FileNotFoundException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.AbstractMap.SimpleEntry;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;

/** Test class for {@link CassandraDriverConfigLoader}. */
@RunWith(MockitoJUnitRunner.class)
public class CassandraDriverConfigLoaderTest {
MockedStatic mockFileReader;

@Before
public void initialize() {
mockFileReader = mockStatic(JarFileReader.class);
}

@Test
public void testCassandraDriverConfigLoaderBasic()
throws FileNotFoundException, MalformedURLException {
String testGcsPath = "gs://smt-test-bucket/cassandraConfig.conf";
URL testUrl = Resources.getResource("test-cassandra-config.conf");
mockFileReader
.when(() -> JarFileReader.saveFilesLocally(testGcsPath))
.thenReturn(new URL[] {testUrl});
DriverConfigLoader driverConfigLoader = CassandraDriverConfigLoader.loadFile(testGcsPath);
assertThat(
driverConfigLoader
.getInitialConfig()
.getProfiles()
.get("default")
.getStringList(CONTACT_POINTS))
.isEqualTo(List.of("127.0.0.1:9042", "127.0.0.2:9042"));
;
assertThat(
driverConfigLoader
.getInitialConfig()
.getProfiles()
.get("default")
.getString(RETRY_POLICY_CLASS))
.isEqualTo("DefaultRetryPolicy");
}

@Test
public void testCassandraDriverConfigLoadError()
throws FileNotFoundException, MalformedURLException {
String testGcsPathNotFound = "gs://smt-test-bucket/cassandraConfigNotFound.conf";
String testGcsPathList =
"gs://smt-test-bucket/cassandraConfig1.conf,gs://smt-test-bucket/cassandraConfig2.conf";

URL testUrl = Resources.getResource("test-cassandra-config-parse-err.conf");
mockFileReader
.when(() -> JarFileReader.saveFilesLocally(testGcsPathNotFound))
.thenReturn(new URL[] {});
mockFileReader
.when(() -> JarFileReader.saveFilesLocally(testGcsPathList))
.thenReturn(
new URL[] {
Resources.getResource("test-cassandra-config.conf"),
Resources.getResource("test-cassandra-config.conf")
});
assertThrows(
FileNotFoundException.class,
() -> CassandraDriverConfigLoader.loadFile(testGcsPathNotFound));
assertThrows(
IllegalArgumentException.class,
() -> CassandraDriverConfigLoader.loadFile(testGcsPathList));
}

@Test
public void testCassandraDriverConfigParseError()
throws FileNotFoundException, MalformedURLException {
String testGcsPath = "gs://smt-test-bucket/cassandraConfig.conf";
URL testUrl = Resources.getResource("test-cassandra-config-parse-err.conf");
mockFileReader
.when(() -> JarFileReader.saveFilesLocally(testGcsPath))
.thenReturn(new URL[] {testUrl});
assertThrows(
ConfigException.Parse.class, () -> CassandraDriverConfigLoader.loadFile(testGcsPath));
}

@Test
public void testOptionsMapConversion() throws FileNotFoundException {

String testGcsPath = "gs://smt-test-bucket/cassandraConfig.conf";
URL testUrl = Resources.getResource("test-cassandra-config.conf");
mockFileReader
.when(() -> JarFileReader.saveFilesLocally(testGcsPath))
.thenReturn(new URL[] {testUrl});
DriverConfigLoader driverConfigLoaderDirect = CassandraDriverConfigLoader.loadFile(testGcsPath);
OptionsMap optionsMap = CassandraDriverConfigLoader.getOptionsMapFromFile(testGcsPath);
DriverConfigLoader driverConfigLoaderFromOptionsMap =
CassandraDriverConfigLoader.fromOptionsMap(optionsMap);
ImmutableMap<String, ImmutableMap<String, String>> directLoadMap =
driverConfigMap(driverConfigLoaderDirect);
ImmutableMap<String, ImmutableMap<String, String>> fromOptionsMap =
driverConfigMap(driverConfigLoaderFromOptionsMap);

assertThat(directLoadMap).isEqualTo(fromOptionsMap);

assertThrows(
IllegalArgumentException.class,
() -> {
OptionsMap optionsMapToLoad = new OptionsMap();
CassandraDriverConfigLoader.putInOptionsMap(
optionsMapToLoad, "default", new SimpleEntry<>("Unsupported", "Unsupported"));
});
}

private static ImmutableMap<String, ImmutableMap<String, String>> driverConfigMap(
DriverConfigLoader driverConfigLoaderDirect) {
ImmutableMap.Builder<String, ImmutableMap<String, String>> driverConfigMap =
ImmutableMap.builder();
driverConfigLoaderDirect
.getInitialConfig()
.getProfiles()
.forEach(
(profile, options) -> {
ImmutableMap.Builder<String, String> profileMapBuilder = ImmutableMap.builder();
options
.entrySet()
.forEach(
e -> profileMapBuilder.put(e.getKey().toString(), e.getValue().toString()));
driverConfigMap.put(profile, profileMapBuilder.build());
});
return driverConfigMap.build();
}

@After
public void cleanup() {
mockFileReader.close();
mockFileReader = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Configuration for the DataStax Java driver for Apache Cassandra®.
# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md.
# This file has an intentional parsing error, to help test exception handling for cases where the config file does not get parsed.
# DO NOT USE FOR PRODUCTION.

datastax-java-driver {
basic.contact-points = [ "127.0.0.1:9042", ]
}
}
Loading
Loading