Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public final class TextIOToBigQueryIT extends TemplateTestBase {

private static final String SCHEMA_PATH = "TextIOToBigQueryTest/schema.json";
private static final String INPUT_PATH = "TextIOToBigQueryTest/input.txt";
private static final String SCHEMA_FNAME = "schema.json";
private static final String INPUT_FNAME = "İnput.json"; // non-ascii file name
private static final String UDF_PATH = "TextIOToBigQueryTest/udf.js";
private static final String PYUDF_PATH = "TextIOToBigQueryTest/pyudf.py";
private BigQueryResourceManager bigQueryClient;
Expand Down Expand Up @@ -116,8 +118,9 @@ public void testTextIOToBigQueryWithPythonUdf() throws IOException {
private void testTextIOToBigQuery(
Function<LaunchConfig.Builder, LaunchConfig.Builder> paramsAdder) throws IOException {
// Arrange
gcsClient.uploadArtifact("schema.json", Resources.getResource(SCHEMA_PATH).getPath());
gcsClient.uploadArtifact("input.txt", Resources.getResource(INPUT_PATH).getPath());
gcsClient.uploadArtifact(SCHEMA_FNAME, Resources.getResource(SCHEMA_PATH).getPath());
// non-ascii file name
gcsClient.uploadArtifact(INPUT_FNAME, Resources.getResource(INPUT_PATH).getPath());

bigQueryClient.createDataset(REGION);
TableId table =
Expand All @@ -139,8 +142,8 @@ private void testTextIOToBigQuery(
launchTemplate(
paramsAdder.apply(
LaunchConfig.builder(testName, specPath)
.addParameter("JSONPath", getGcsPath("schema.json"))
.addParameter("inputFilePattern", getGcsPath("input.txt"))
.addParameter("JSONPath", getGcsPath(SCHEMA_FNAME))
.addParameter("inputFilePattern", getGcsPath(INPUT_FNAME))
.addParameter("outputTable", toTableSpecLegacy(table))
.addParameter("bigQueryLoadingTemporaryDirectory", getGcsPath("bq-tmp"))));
assertThatPipeline(info).isRunning();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.kms.v1.CryptoKey;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.common.base.Strings;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class JdbcToBigQueryIT extends JDBCBaseIT {
private static final String NAME = "name";
private static final String FULL_NAME = "full_name";
private static final String AGE = "age";
private static final String AGE_FR = "âge"; // non-ascii field name
private static final String MEMBER = "member";
private static final String IS_MEMBER = "is_member";
private static final String ENTRY_ADDED = "entry_added";
Expand Down Expand Up @@ -115,7 +117,7 @@ public void testMySqlToBigQueryFlex() throws IOException {
HashMap<String, String> columns = new HashMap<>();
columns.put(ROW_ID, "NUMERIC NOT NULL");
columns.put(NAME, "VARCHAR(200)");
columns.put(AGE, "NUMERIC");
columns.put(AGE_FR, "NUMERIC");
columns.put(MEMBER, "VARCHAR(200)");
columns.put(ENTRY_ADDED, "VARCHAR(200)");
JDBCResourceManager.JDBCSchema schema = new JDBCResourceManager.JDBCSchema(columns, ROW_ID);
Expand All @@ -132,7 +134,7 @@ public void testMySqlToBigQueryFlex() throws IOException {
config ->
config.addParameter(
"query",
"SELECT ROW_ID, NAME AS FULL_NAME, AGE, MEMBER AS IS_MEMBER, ENTRY_ADDED FROM "
"SELECT ROW_ID, NAME AS FULL_NAME, âge, MEMBER AS IS_MEMBER, ENTRY_ADDED FROM "
+ testName));
}

Expand Down Expand Up @@ -201,48 +203,19 @@ public void testMySqlToBigQueryWithStorageWriteApi() throws IOException {
}

@Test
public void testPostgresToBigQueryFlex() throws IOException {
// Create postgres Resource manager
postgresResourceManager = PostgresResourceManager.builder(testName).build();

HashMap<String, String> columns = new HashMap<>();
columns.put(ROW_ID, "INTEGER NOT NULL");
columns.put(NAME, "VARCHAR(200)");
columns.put(AGE, "INTEGER");
columns.put(MEMBER, "VARCHAR(200)");
columns.put(ENTRY_ADDED, "VARCHAR(200)");
JDBCResourceManager.JDBCSchema schema = new JDBCResourceManager.JDBCSchema(columns, ROW_ID);

// Run a simple IT
simpleJdbcToBigQueryTest(
testName,
schema,
POSTGRES_DRIVER,
postgresDriverGCSPath(),
postgresResourceManager,
true,
false,
config ->
config.addParameter(
"query",
"SELECT ROW_ID, NAME AS FULL_NAME, AGE, MEMBER AS IS_MEMBER, ENTRY_ADDED FROM "
+ testName));
}

@Test
public void testPostgresWithUnicodeCharactersInQuery() throws IOException {
public void testPostgresStageQueryInGcs() throws IOException {
String tableName = "unicóde_table";

postgresResourceManager = PostgresResourceManager.builder(testName).build();
gcsClient.createArtifact(
"input/query.sql",
"SELECT ROW_ID, NAME AS FULL_NAME, AGE, MEMBER AS IS_MEMBER, ENTRY_ADDED FROM "
"SELECT ROW_ID, NAME AS FULL_NAME, âge, MEMBER AS IS_MEMBER, ENTRY_ADDED FROM "
+ tableName);

HashMap<String, String> columns = new HashMap<>();
columns.put(ROW_ID, "INTEGER NOT NULL");
columns.put(NAME, "VARCHAR(200)");
columns.put(AGE, "INTEGER");
columns.put(AGE_FR, "INTEGER");
columns.put(MEMBER, "VARCHAR(200)");
columns.put(ENTRY_ADDED, "VARCHAR(200)");
JDBCResourceManager.JDBCSchema schema = new JDBCResourceManager.JDBCSchema(columns, ROW_ID);
Expand Down Expand Up @@ -275,7 +248,7 @@ public void testOracleToBigQueryFlex() throws IOException {
HashMap<String, String> columns = new HashMap<>();
columns.put(ROW_ID, "NUMERIC NOT NULL");
columns.put(NAME, "VARCHAR(200)");
columns.put(AGE, "NUMERIC");
columns.put(AGE_FR, "NUMERIC");
columns.put(MEMBER, "VARCHAR(200)");
columns.put(ENTRY_ADDED, "VARCHAR(200)");
JDBCResourceManager.JDBCSchema schema = new JDBCResourceManager.JDBCSchema(columns, ROW_ID);
Expand All @@ -292,7 +265,7 @@ public void testOracleToBigQueryFlex() throws IOException {
config ->
config.addParameter(
"query",
"SELECT ROW_ID, NAME AS FULL_NAME, AGE, MEMBER AS IS_MEMBER, ENTRY_ADDED FROM "
"SELECT ROW_ID, NAME AS FULL_NAME, âge, MEMBER AS IS_MEMBER, ENTRY_ADDED FROM "
+ testName));
}

Expand All @@ -305,7 +278,7 @@ public void testMsSqlToBigQueryFlex() throws IOException {
HashMap<String, String> columns = new HashMap<>();
columns.put(ROW_ID, "NUMERIC NOT NULL");
columns.put(NAME, "VARCHAR(200)");
columns.put(AGE, "NUMERIC");
columns.put(AGE_FR, "NUMERIC");
columns.put(MEMBER, "VARCHAR(200)");
columns.put(ENTRY_ADDED, "VARCHAR(200)");
JDBCResourceManager.JDBCSchema schema = new JDBCResourceManager.JDBCSchema(columns, ROW_ID);
Expand All @@ -322,18 +295,18 @@ public void testMsSqlToBigQueryFlex() throws IOException {
config ->
config.addParameter(
"query",
"SELECT ROW_ID, NAME AS FULL_NAME, AGE, MEMBER AS IS_MEMBER, ENTRY_ADDED FROM "
"SELECT ROW_ID, NAME AS FULL_NAME, âge, MEMBER AS IS_MEMBER, ENTRY_ADDED FROM "
+ testName));
}

@Test
public void testReadWithPartitions() throws IOException {
public void testPostgresReadWithPartitions() throws IOException {
postgresResourceManager = PostgresResourceManager.builder(testId).build();

HashMap<String, String> columns = new HashMap<>();
columns.put(ROW_ID, "INTEGER NOT NULL");
columns.put(NAME, "VARCHAR(200)");
columns.put(AGE, "INTEGER");
columns.put(AGE_FR, "INTEGER");
columns.put(MEMBER, "VARCHAR(200)");
columns.put(ENTRY_ADDED, "VARCHAR(200)");
JDBCResourceManager.JDBCSchema schema = new JDBCResourceManager.JDBCSchema(columns, ROW_ID);
Expand Down Expand Up @@ -385,9 +358,13 @@ private void simpleJdbcToBigQueryTest(
boolean useDlq,
Function<LaunchConfig.Builder, LaunchConfig.Builder> paramsAdder)
throws IOException {
PipelineLauncher.LaunchConfig.Builder options =
paramsAdder.apply(PipelineLauncher.LaunchConfig.builder(testName, specPath));

// Arrange
List<String> columns = new ArrayList<>(List.of(ROW_ID, NAME, AGE, MEMBER, ENTRY_ADDED));
// TODO: cover unicode field name for all tests when StorageWriteApi supports
boolean unicodeFieldName = Strings.isNullOrEmpty(options.getParameter("useStorageWriteApi"));
final String ageField = unicodeFieldName ? AGE_FR : AGE;
List<String> columns = new ArrayList<>(List.of(ROW_ID, NAME, ageField, MEMBER, ENTRY_ADDED));
if (useDlq) {
columns.add(FAKE);
}
Expand All @@ -399,7 +376,7 @@ private void simpleJdbcToBigQueryTest(
Arrays.asList(
Field.of(ROW_ID, StandardSQLTypeName.INT64),
Field.of(useColumnAlias ? FULL_NAME : NAME, StandardSQLTypeName.STRING),
Field.of(AGE, StandardSQLTypeName.FLOAT64),
Field.of(ageField, StandardSQLTypeName.FLOAT64),
Field.of(useColumnAlias ? IS_MEMBER : MEMBER, StandardSQLTypeName.STRING),
Field.of(ENTRY_ADDED, StandardSQLTypeName.STRING));
Schema bqSchema = Schema.of(bqSchemaFields);
Expand All @@ -410,26 +387,22 @@ private void simpleJdbcToBigQueryTest(
Function<String, String> encrypt =
message -> kmsResourceManager.encrypt(KEYRING_ID, CRYPTO_KEY_NAME, message);
CryptoKey cryptoKey = kmsResourceManager.getOrCreateCryptoKey(KEYRING_ID, CRYPTO_KEY_NAME);

PipelineLauncher.LaunchConfig.Builder options =
paramsAdder.apply(
PipelineLauncher.LaunchConfig.builder(testName, specPath)
.addParameter("connectionURL", encrypt.apply(jdbcResourceManager.getUri()))
.addParameter("driverClassName", driverClassName)
.addParameter("outputTable", toTableSpecLegacy(table))
.addParameter("driverJars", driverJars)
.addParameter("bigQueryLoadingTemporaryDirectory", getGcsBasePath() + "/temp")
.addParameter("username", encrypt.apply(jdbcResourceManager.getUsername()))
.addParameter("password", encrypt.apply(jdbcResourceManager.getPassword()))
.addParameter("KMSEncryptionKey", cryptoKey.getName())
.addParameter("useColumnAlias", "true")
.addParameter("fetchSize", "100000")
.addParameter("connectionProperties", "characterEncoding=UTF-8")
.addParameter("disabledAlgorithms", "SSLv3, GCM"));
options
.addParameter("connectionURL", encrypt.apply(jdbcResourceManager.getUri()))
.addParameter("driverClassName", driverClassName)
.addParameter("outputTable", toTableSpecLegacy(table))
.addParameter("driverJars", driverJars)
.addParameter("bigQueryLoadingTemporaryDirectory", getGcsBasePath() + "/temp")
.addParameter("username", encrypt.apply(jdbcResourceManager.getUsername()))
.addParameter("password", encrypt.apply(jdbcResourceManager.getPassword()))
.addParameter("KMSEncryptionKey", cryptoKey.getName())
.addParameter("useColumnAlias", "true")
.addParameter("fetchSize", "100000")
.addParameter("connectionProperties", "characterEncoding=UTF-8")
.addParameter("disabledAlgorithms", "SSLv3, GCM");
if (useDlq) {
options.addParameter("outputDeadletterTable", toTableSpecLegacy(table) + "_error_records");
}

// Act
PipelineLauncher.LaunchInfo info = launchTemplate(options);
assertThatPipeline(info).isRunning();
Expand Down
Loading