From 38dcb6f6a38cd5ba1280428ec8897172319b6898 Mon Sep 17 00:00:00 2001 From: Andrii Leonets <30464745+DoNotPanicUA@users.noreply.github.com> Date: Wed, 9 Jun 2021 21:11:15 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20MySQL=20source:=20Comprehensive?= =?UTF-8?q?=20core=20extention=20to=20be=20more=20compatible=20with=20othe?= =?UTF-8?q?r=20JDBC=20sources=20(#3995)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [#3994] Provide the possibility to redefine namespace and basic column names --- .../source/SourceComprehensiveTest.java | 44 ++++++++++++++++--- .../standardtest/source/TestDataHolder.java | 28 +++++++++--- .../CdcMySqlSourceComprehensiveTest.java | 5 +++ .../mysql/MySqlSourceComprehensiveTest.java | 5 +++ 4 files changed, 69 insertions(+), 13 deletions(-) diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceComprehensiveTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceComprehensiveTest.java index cc4825d82280..be8af318dec7 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceComprehensiveTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceComprehensiveTest.java @@ -51,9 +51,28 @@ public abstract class SourceComprehensiveTest extends SourceAbstractTest { private static final Logger LOGGER = LoggerFactory.getLogger(SourceComprehensiveTest.class); - private final String TEST_COLUMN_NAME = "test_column"; private final List testDataHolders = new ArrayList<>(); + /** + * The column name will be used for a PK column in the test tables. Override it if default name is + * not valid for your source. + * + * @return Id column name + */ + protected String getIdColumnName() { + return "id"; + } + + /** + * The column name will be used for a test column in the test tables. Override it if default name is + * not valid for your source. + * + * @return Test column name + */ + protected String getTestColumnName() { + return "test_column"; + } + /** * Setup the test database. All tables and data described in the registered tests will be put there. * @@ -72,6 +91,14 @@ protected void setupEnvironment(TestDestinationEnv environment) throws Exception setupDatabaseInternal(); } + /** + * Provide a source namespace. It's allocated place for table creation. It also known ask "Database + * Schema" or "Dataset" + * + * @return source name space + */ + protected abstract String getNameSpace(); + /** * The test checks that connector can fetch prepared data without failure. */ @@ -91,7 +118,7 @@ public void testDataTypes() throws Exception { String streamName = msg.getRecord().getStream(); List expectedValuesForStream = expectedValues.get(streamName); if (expectedValuesForStream != null) { - String value = getValueFromJsonNode(msg.getRecord().getData().get(TEST_COLUMN_NAME)); + String value = getValueFromJsonNode(msg.getRecord().getData().get(getTestColumnName())); assertTrue(expectedValuesForStream.contains(value), "Returned value '" + value + "' by streamer " + streamName + " should be in the expected list: " + expectedValuesForStream); expectedValuesForStream.remove(value); @@ -145,15 +172,15 @@ private ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception { .stream() .map(test -> new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) - .withCursorField(Lists.newArrayList("id")) + .withCursorField(Lists.newArrayList(getIdColumnName())) .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream(CatalogHelpers.createAirbyteStream( String.format("%s", test.getNameWithTestPrefix()), - String.format("%s", config.get("database").asText()), - Field.of("id", JsonSchemaPrimitive.NUMBER), - Field.of(TEST_COLUMN_NAME, test.getAirbyteType())) + String.format("%s", getNameSpace()), + Field.of(getIdColumnName(), JsonSchemaPrimitive.NUMBER), + Field.of(getTestColumnName(), test.getAirbyteType())) .withSourceDefinedCursor(true) - .withSourceDefinedPrimaryKey(List.of(List.of("id"))) + .withSourceDefinedPrimaryKey(List.of(List.of(getIdColumnName()))) .withSupportedSyncModes( Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))) .collect(Collectors.toList())); @@ -169,6 +196,9 @@ private ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception { public void addDataTypeTestData(TestDataHolder test) { testDataHolders.add(test); test.setTestNumber(testDataHolders.stream().filter(t -> t.getSourceType().equals(test.getSourceType())).count()); + test.setNameSpace(getNameSpace()); + test.setIdColumnName(getIdColumnName()); + test.setTestColumnName(getTestColumnName()); } } diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestDataHolder.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestDataHolder.java index 9dc9626ecd1c..edb2e1820224 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestDataHolder.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestDataHolder.java @@ -32,8 +32,8 @@ public class TestDataHolder { - private static final String DEFAULT_CREATE_TABLE_SQL = "CREATE TABLE %1$s(id integer primary key, test_column %2$s);"; - private static final String DEFAULT_INSERT_SQL = "INSERT INTO %1$s VALUES (%2$s, %3$s);"; + private static final String DEFAULT_CREATE_TABLE_SQL = "CREATE TABLE %1$s(%2$s INTEGER PRIMARY KEY, %3$s %4$s)"; + private static final String DEFAULT_INSERT_SQL = "INSERT INTO %1$s VALUES (%2$s, %3$s)"; private final String sourceType; private final JsonSchemaPrimitive airbyteType; @@ -42,7 +42,10 @@ public class TestDataHolder { private final String createTablePatternSql; private final String insertPatternSql; private final String fullSourceDataType; + private String nameSpace; private long testNumber; + private String idColumnName; + private String testColumnName; TestDataHolder(String sourceType, JsonSchemaPrimitive airbyteType, @@ -192,10 +195,22 @@ public TestDataHolder build() { } - public void setTestNumber(long testNumber) { + void setNameSpace(String nameSpace) { + this.nameSpace = nameSpace; + } + + void setTestNumber(long testNumber) { this.testNumber = testNumber; } + void setIdColumnName(String idColumnName) { + this.idColumnName = idColumnName; + } + + void setTestColumnName(String testColumnName) { + this.testColumnName = testColumnName; + } + public String getSourceType() { return sourceType; } @@ -209,18 +224,19 @@ public List getExpectedValues() { } public String getNameWithTestPrefix() { - return "test_" + testNumber + "_" + sourceType; + return nameSpace + "_" + testNumber + "_" + sourceType; } public String getCreateSqlQuery() { - return String.format(createTablePatternSql, getNameWithTestPrefix(), fullSourceDataType); + return String.format(createTablePatternSql, (nameSpace != null ? nameSpace + "." : "") + getNameWithTestPrefix(), idColumnName, testColumnName, + fullSourceDataType); } public List getInsertSqlQueries() { List insertSqls = new ArrayList<>(); int rowId = 1; for (String value : values) { - insertSqls.add(String.format(insertPatternSql, getNameWithTestPrefix(), rowId++, value)); + insertSqls.add(String.format(insertPatternSql, (nameSpace != null ? nameSpace + "." : "") + getNameWithTestPrefix(), rowId++, value)); } return insertSqls; } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceComprehensiveTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceComprehensiveTest.java index d0f043f00f2b..01699b3b10f9 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceComprehensiveTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceComprehensiveTest.java @@ -90,6 +90,11 @@ protected Database setupDatabase() throws Exception { return database; } + @Override + protected String getNameSpace() { + return container.getDatabaseName(); + } + private void revokeAllPermissions() { executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';"); } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceComprehensiveTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceComprehensiveTest.java index cffc713f416b..f311d117d17f 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceComprehensiveTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlSourceComprehensiveTest.java @@ -86,6 +86,11 @@ protected Database setupDatabase() throws Exception { return database; } + @Override + protected String getNameSpace() { + return container.getDatabaseName(); + } + @Override protected void initTests() { addDataTypeTestData(