Skip to content

Commit

Permalink
mysql dv2 raw table impl
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Apr 15, 2024
1 parent 97d1fa3 commit a8dc68e
Show file tree
Hide file tree
Showing 51 changed files with 478 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.30.2'
features = ['db-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

//remove once upgrading the CDK version to 0.4.x or later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.30.2'
features = ['db-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

//remove once upgrading the CDK version to 0.4.x or later
Expand All @@ -27,9 +27,3 @@ dependencies {
implementation 'mysql:mysql-connector-java:8.0.22'
integrationTestJavaImplementation libs.testcontainers.mysql
}

configurations.all {
resolutionStrategy {
force libs.jooq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.NoOpJdbcDestinationHandler;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
Expand All @@ -30,13 +31,15 @@
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility;
import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlSqlGenerator;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.jetbrains.annotations.NotNull;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -141,14 +144,24 @@ public JsonNode toJdbcConfig(final JsonNode config) {

@Override
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
throw new UnsupportedOperationException("mysql does not yet support DV2");
return new MysqlSqlGenerator();
}

@Override
protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) {
return new PropertyNameSimplifyingDataTransformer();
}

@Override
public boolean isV2Destination() {
return true;
}

@Override
protected boolean shouldAlwaysDisableTypeDedupe() {
return true;
}

public static void main(final String[] args) throws Exception {
final Destination destination = MySQLDestination.sshWrappedDestination();
LOGGER.info("starting destination: {}", MySQLDestination.class);
Expand All @@ -161,7 +174,7 @@ public static void main(final String[] args) throws Exception {
protected JdbcDestinationHandler<MinimumDestinationState> getDestinationHandler(@NotNull String databaseName,
@NotNull JdbcDatabase database,
@NotNull String rawTableSchema) {
throw new UnsupportedOperationException("Mysql does not yet support DV2");
return new NoOpJdbcDestinationHandler<>(databaseName, database, rawTableSchema, SQLDialect.DEFAULT);
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

package io.airbyte.integrations.destination.mysql;

import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
Expand Down Expand Up @@ -56,7 +62,8 @@ protected void insertRecordsInternalV2(final JdbcDatabase database,
final String schemaName,
final String tableName)
throws Exception {
throw new UnsupportedOperationException("mysql does not yet support DV2");
// TODO ... how does this actually work?
insertRecordsInternal(database, records, schemaName, tableName);
}

private void loadDataIntoTable(final JdbcDatabase database,
Expand Down Expand Up @@ -129,7 +136,7 @@ private boolean checkIfLocalFileIsEnabled(final JdbcDatabase database) throws SQ
}

@Override
public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) {
protected String createTableQueryV1(String schemaName, String tableName) {
// MySQL requires byte information with VARCHAR. Since we are using uuid as value for the column,
// 256 is enough
return String.format(
Expand All @@ -141,6 +148,28 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN
schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
}

protected String createTableQueryV2(String schemaName, String tableName) {
// MySQL requires byte information with VARCHAR. Since we are using uuid as value for the column,
// 256 is enough
return String.format(
"""
CREATE TABLE IF NOT EXISTS %s.%s (\s
%s VARCHAR(256) PRIMARY KEY,
%s JSON,
%s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6),
%s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6),
%s JSON
);
""",
schemaName,
tableName,
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
JavaBaseConstants.COLUMN_NAME_AB_META);
}

public static class VersionCompatibility {

private final double version;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.airbyte.integrations.destination.mysql.typing_deduping

import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.RawOnlySqlGenerator
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.base.destination.typing_deduping.StreamId.Companion.concatenateRawTableName
import io.airbyte.integrations.destination.mysql.MySQLNameTransformer

class MysqlSqlGenerator : RawOnlySqlGenerator(MySQLNameTransformer()) {

override fun buildStreamId(
namespace: String,
name: String,
rawNamespaceOverride: String
): StreamId {
return StreamId(
namingTransformer.getNamespace(namespace),
namingTransformer.convertStreamName(name),
namingTransformer.getNamespace(rawNamespaceOverride),
// The default implementation is just convertStreamName(concatenate()).
// Wrap in getIdentifier to also truncate.
// This is probably only necessary because the mysql name transformer
// doesn't call convertStreamName in getIdentifier (probably a bug?).
// But that entire NameTransformer interface is a hot mess anyway.
namingTransformer.getIdentifier(
namingTransformer.convertStreamName(
concatenateRawTableName(namespace, name),
),
),
namespace,
name,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
"title": "JDBC URL Params",
"type": "string",
"order": 6
},
"raw_table_database": {
"type": "string",
"description": "The database to write raw tables into",
"title": "Raw table database (defaults to airbyte_internal)",
"order": 7
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ protected boolean supportObjectDataTypeTest() {

@Override
protected JsonNode getConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
return getConfigFromTestContainer(db);
}

public static ObjectNode getConfigFromTestContainer(final MySQLContainer<?> db) {
return (ObjectNode) Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(db))
.put(JdbcUtils.USERNAME_KEY, db.getUsername())
.put(JdbcUtils.PASSWORD_KEY, db.getPassword())
Expand Down Expand Up @@ -132,23 +136,22 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
try (final DSLContext dslContext = DSLContextFactory.create(
final DSLContext dslContext = DSLContextFactory.create(
db.getUsername(),
db.getPassword(),
db.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
SQLDialect.MYSQL)) {
return new Database(dslContext).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList()));
}
SQLDialect.MYSQL);
return new Database(dslContext).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList()));
}

@Override
Expand All @@ -163,36 +166,39 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv test
protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TEST_SCHEMAS) {
db = new MySQLContainer<>("mysql:8.0");
db.start();
setLocalInFileToTrue();
revokeAllPermissions();
grantCorrectPermissions();
configureTestContainer(db);
}

public static void configureTestContainer(final MySQLContainer<?> db) {
setLocalInFileToTrue(db);
revokeAllPermissions(db);
grantCorrectPermissions(db);
}

private void setLocalInFileToTrue() {
executeQuery("set global local_infile=true");
private static void setLocalInFileToTrue(final MySQLContainer<?> db) {
executeQuery(db, "set global local_infile=true");
}

private void revokeAllPermissions() {
executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + db.getUsername() + "@'%';");
private static void revokeAllPermissions(final MySQLContainer<?> db) {
executeQuery(db, "REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + db.getUsername() + "@'%';");
}

private void grantCorrectPermissions() {
executeQuery("GRANT ALTER, CREATE, INSERT, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';");
private static void grantCorrectPermissions(final MySQLContainer<?> db) {
executeQuery(db, "GRANT ALTER, CREATE, INSERT, INDEX, UPDATE, DELETE, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';");
}

private void executeQuery(final String query) {
try (final DSLContext dslContext = DSLContextFactory.create(
private static void executeQuery(final MySQLContainer<?> db, final String query) {
final DSLContext dslContext = DSLContextFactory.create(
"root",
"test",
db.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
SQLDialect.MYSQL)) {
new Database(dslContext).query(
ctx -> ctx
.execute(query));
SQLDialect.MYSQL);
try {
new Database(dslContext).query(ctx -> ctx.execute(query));
} catch (final SQLException e) {
throw new RuntimeException(e);
}
Expand All @@ -208,7 +214,7 @@ protected void tearDown(final TestDestinationEnv testEnv) {
@Test
public void testCustomDbtTransformations() throws Exception {
// We need to create view for testing custom dbt transformations
executeQuery("GRANT CREATE VIEW ON *.* TO " + db.getUsername() + "@'%';");
executeQuery(db, "GRANT CREATE VIEW ON *.* TO " + db.getUsername() + "@'%';");
super.testCustomDbtTransformations();
}

Expand Down Expand Up @@ -330,7 +336,7 @@ public void testCheckIncorrectDataBaseFailure() {
unit = SECONDS)
@Test
public void testUserHasNoPermissionToDataBase() {
executeQuery("create user '" + USERNAME_WITHOUT_PERMISSION + "'@'%' IDENTIFIED BY '" + PASSWORD_WITHOUT_PERMISSION + "';\n");
executeQuery(db, "create user '" + USERNAME_WITHOUT_PERMISSION + "'@'%' IDENTIFIED BY '" + PASSWORD_WITHOUT_PERMISSION + "';\n");
final JsonNode config = ((ObjectNode) getConfigForBareMetalConnection()).put(JdbcUtils.USERNAME_KEY, USERNAME_WITHOUT_PERMISSION);
((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, PASSWORD_WITHOUT_PERMISSION);
final MySQLDestination destination = new MySQLDestination();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TES

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
dslContext.close();
db.stop();
db.close();
}
Expand Down Expand Up @@ -128,18 +127,17 @@ private void grantCorrectPermissions() {
}

private void executeQuery(final String query) {
try (final DSLContext dslContext = DSLContextFactory.create(
final DSLContext dslContext = DSLContextFactory.create(
"root",
"test",
db.getDriverClassName(),
String.format("jdbc:mysql://%s:%s/%s?useSSL=true&requireSSL=true&verifyServerCertificate=false",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
SQLDialect.DEFAULT)) {
new Database(dslContext).query(
ctx -> ctx
.execute(query));
SQLDialect.DEFAULT);
try {
new Database(dslContext).query(ctx -> ctx.execute(query));
} catch (final SQLException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.airbyte.integrations.destination.mysql

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.db.jdbc.JdbcSourceOperations
import io.airbyte.commons.json.Jsons
import java.sql.ResultSet
import java.sql.SQLException
import java.util.Locale

class MysqlTestSourceOperations : JdbcSourceOperations() {
@Throws(SQLException::class)
override fun copyToJsonField(resultSet: ResultSet, colIndex: Int, json: ObjectNode) {
val columnName = resultSet.metaData.getColumnName(colIndex)
val columnTypeName = resultSet.metaData.getColumnTypeName(colIndex).lowercase(Locale.getDefault())

// JSON has no equivalent in JDBCType
if ("json" == columnTypeName) {
json.set<JsonNode>(columnName, Jsons.deserializeExact(resultSet.getString(colIndex)))
} else {
super.copyToJsonField(resultSet, colIndex, json)
}
}
}
Loading

0 comments on commit a8dc68e

Please sign in to comment.