Skip to content

Commit

Permalink
mysql dv2 raw table impl
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Apr 30, 2024
1 parent 55e7bf2 commit 69a7a7a
Show file tree
Hide file tree
Showing 58 changed files with 742 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.30.2'
features = ['db-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

//remove once upgrading the CDK version to 0.4.x or later
Expand All @@ -31,9 +31,3 @@ dependencies {
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-mysql')
integrationTestJavaImplementation libs.testcontainers.mysql
}

configurations.all {
resolutionStrategy {
force libs.jooq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

@Disabled
public class MySQLStrictEncryptDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {

private MySQLContainer<?> db;
Expand Down Expand Up @@ -113,23 +114,22 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
try (final DSLContext dslContext = DSLContextFactory.create(
final DSLContext dslContext = DSLContextFactory.create(
db.getUsername(),
db.getPassword(),
db.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
SQLDialect.MYSQL)) {
return new Database(dslContext).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList()));
}
SQLDialect.MYSQL);
return new Database(dslContext).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList()));
}

@Override
Expand Down Expand Up @@ -162,19 +162,18 @@ private void grantCorrectPermissions() {
}

private void executeQuery(final String query) {
try (final DSLContext dslContext = DSLContextFactory.create(
final DSLContext dslContext = DSLContextFactory.create(
"root",
"test",
db.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
SQLDialect.MYSQL)) {
new Database(dslContext).query(
ctx -> ctx
.execute(query));
} catch (final SQLException e) {
SQLDialect.MYSQL);
try {
new Database(dslContext).query(ctx -> ctx.execute(query));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/mysql",
"supportsIncremental": true,
"supportsNormalization": true,
"supportsNormalization": false,
"supportsDBT": true,
"supported_destination_sync_modes": ["overwrite", "append"],
"connectionSpecification": {
Expand Down Expand Up @@ -165,6 +165,12 @@
}
}
]
},
"raw_data_schema": {
"type": "string",
"description": "The database to write raw tables into",
"title": "Raw table database (defaults to airbyte_internal)",
"order": 7
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.30.2'
features = ['db-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

//remove once upgrading the CDK version to 0.4.x or later
Expand All @@ -26,10 +26,5 @@ application {
dependencies {
implementation 'mysql:mysql-connector-java:8.0.22'
integrationTestJavaImplementation libs.testcontainers.mysql
}

configurations.all {
resolutionStrategy {
force libs.jooq
}
testFixturesApi libs.testcontainers.mysql
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# our testcontainer has issues with too much concurrency.
# 4 threads seems to be the sweet spot.
testExecutionConcurrency=4
JunitMethodExecutionTimeout=15 m
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ data:
icon: mysql.svg
license: ELv2
name: MySQL
normalizationConfig:
normalizationIntegrationType: mysql
normalizationRepository: airbyte/normalization-mysql
normalizationTag: 0.4.3
registries:
cloud:
dockerImageTag: 0.2.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.NoOpJdbcDestinationHandler;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
Expand All @@ -30,13 +31,15 @@
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility;
import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlSqlGenerator;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.jetbrains.annotations.NotNull;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -141,14 +144,24 @@ public JsonNode toJdbcConfig(final JsonNode config) {

@Override
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
throw new UnsupportedOperationException("mysql does not yet support DV2");
return new MysqlSqlGenerator();
}

@Override
protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) {
return new PropertyNameSimplifyingDataTransformer();
}

@Override
public boolean isV2Destination() {
return true;
}

@Override
protected boolean shouldAlwaysDisableTypeDedupe() {
return true;
}

public static void main(final String[] args) throws Exception {
final Destination destination = MySQLDestination.sshWrappedDestination();
LOGGER.info("starting destination: {}", MySQLDestination.class);
Expand All @@ -161,7 +174,7 @@ public static void main(final String[] args) throws Exception {
protected JdbcDestinationHandler<MinimumDestinationState> getDestinationHandler(@NotNull String databaseName,
@NotNull JdbcDatabase database,
@NotNull String rawTableSchema) {
throw new UnsupportedOperationException("Mysql does not yet support DV2");
return new NoOpJdbcDestinationHandler<>(databaseName, database, rawTableSchema, SQLDialect.DEFAULT);
}

@NotNull
Expand Down
Loading

0 comments on commit 69a7a7a

Please sign in to comment.