Skip to content

Commit

Permalink
🎉 MySQL source: Comprehensive core extention to be more compatible wi…
Browse files Browse the repository at this point in the history
…th other JDBC sources (#3995)

* [#3994] Provide the possibility to redefine namespace and basic column names
  • Loading branch information
DoNotPanicUA authored Jun 9, 2021
1 parent 6ec8b31 commit 38dcb6f
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,28 @@ public abstract class SourceComprehensiveTest extends SourceAbstractTest {

private static final Logger LOGGER = LoggerFactory.getLogger(SourceComprehensiveTest.class);

private final String TEST_COLUMN_NAME = "test_column";
private final List<TestDataHolder> testDataHolders = new ArrayList<>();

/**
* The column name will be used for a PK column in the test tables. Override it if default name is
* not valid for your source.
*
* @return Id column name
*/
protected String getIdColumnName() {
return "id";
}

/**
* The column name will be used for a test column in the test tables. Override it if default name is
* not valid for your source.
*
* @return Test column name
*/
protected String getTestColumnName() {
return "test_column";
}

/**
* Setup the test database. All tables and data described in the registered tests will be put there.
*
Expand All @@ -72,6 +91,14 @@ protected void setupEnvironment(TestDestinationEnv environment) throws Exception
setupDatabaseInternal();
}

/**
* Provide a source namespace. It's allocated place for table creation. It also known ask "Database
* Schema" or "Dataset"
*
* @return source name space
*/
protected abstract String getNameSpace();

/**
* The test checks that connector can fetch prepared data without failure.
*/
Expand All @@ -91,7 +118,7 @@ public void testDataTypes() throws Exception {
String streamName = msg.getRecord().getStream();
List<String> expectedValuesForStream = expectedValues.get(streamName);
if (expectedValuesForStream != null) {
String value = getValueFromJsonNode(msg.getRecord().getData().get(TEST_COLUMN_NAME));
String value = getValueFromJsonNode(msg.getRecord().getData().get(getTestColumnName()));
assertTrue(expectedValuesForStream.contains(value),
"Returned value '" + value + "' by streamer " + streamName + " should be in the expected list: " + expectedValuesForStream);
expectedValuesForStream.remove(value);
Expand Down Expand Up @@ -145,15 +172,15 @@ private ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
.stream()
.map(test -> new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withCursorField(Lists.newArrayList(getIdColumnName()))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", test.getNameWithTestPrefix()),
String.format("%s", config.get("database").asText()),
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of(TEST_COLUMN_NAME, test.getAirbyteType()))
String.format("%s", getNameSpace()),
Field.of(getIdColumnName(), JsonSchemaPrimitive.NUMBER),
Field.of(getTestColumnName(), test.getAirbyteType()))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSourceDefinedPrimaryKey(List.of(List.of(getIdColumnName())))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))
.collect(Collectors.toList()));
Expand All @@ -169,6 +196,9 @@ private ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
public void addDataTypeTestData(TestDataHolder test) {
testDataHolders.add(test);
test.setTestNumber(testDataHolders.stream().filter(t -> t.getSourceType().equals(test.getSourceType())).count());
test.setNameSpace(getNameSpace());
test.setIdColumnName(getIdColumnName());
test.setTestColumnName(getTestColumnName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@

public class TestDataHolder {

private static final String DEFAULT_CREATE_TABLE_SQL = "CREATE TABLE %1$s(id integer primary key, test_column %2$s);";
private static final String DEFAULT_INSERT_SQL = "INSERT INTO %1$s VALUES (%2$s, %3$s);";
private static final String DEFAULT_CREATE_TABLE_SQL = "CREATE TABLE %1$s(%2$s INTEGER PRIMARY KEY, %3$s %4$s)";
private static final String DEFAULT_INSERT_SQL = "INSERT INTO %1$s VALUES (%2$s, %3$s)";

private final String sourceType;
private final JsonSchemaPrimitive airbyteType;
Expand All @@ -42,7 +42,10 @@ public class TestDataHolder {
private final String createTablePatternSql;
private final String insertPatternSql;
private final String fullSourceDataType;
private String nameSpace;
private long testNumber;
private String idColumnName;
private String testColumnName;

TestDataHolder(String sourceType,
JsonSchemaPrimitive airbyteType,
Expand Down Expand Up @@ -192,10 +195,22 @@ public TestDataHolder build() {

}

public void setTestNumber(long testNumber) {
void setNameSpace(String nameSpace) {
this.nameSpace = nameSpace;
}

void setTestNumber(long testNumber) {
this.testNumber = testNumber;
}

void setIdColumnName(String idColumnName) {
this.idColumnName = idColumnName;
}

void setTestColumnName(String testColumnName) {
this.testColumnName = testColumnName;
}

public String getSourceType() {
return sourceType;
}
Expand All @@ -209,18 +224,19 @@ public List<String> getExpectedValues() {
}

public String getNameWithTestPrefix() {
return "test_" + testNumber + "_" + sourceType;
return nameSpace + "_" + testNumber + "_" + sourceType;
}

public String getCreateSqlQuery() {
return String.format(createTablePatternSql, getNameWithTestPrefix(), fullSourceDataType);
return String.format(createTablePatternSql, (nameSpace != null ? nameSpace + "." : "") + getNameWithTestPrefix(), idColumnName, testColumnName,
fullSourceDataType);
}

public List<String> getInsertSqlQueries() {
List<String> insertSqls = new ArrayList<>();
int rowId = 1;
for (String value : values) {
insertSqls.add(String.format(insertPatternSql, getNameWithTestPrefix(), rowId++, value));
insertSqls.add(String.format(insertPatternSql, (nameSpace != null ? nameSpace + "." : "") + getNameWithTestPrefix(), rowId++, value));
}
return insertSqls;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ protected Database setupDatabase() throws Exception {
return database;
}

@Override
protected String getNameSpace() {
return container.getDatabaseName();
}

private void revokeAllPermissions() {
executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ protected Database setupDatabase() throws Exception {
return database;
}

@Override
protected String getNameSpace() {
return container.getDatabaseName();
}

@Override
protected void initTests() {
addDataTypeTestData(
Expand Down

0 comments on commit 38dcb6f

Please sign in to comment.