Skip to content

Commit

Permalink
implement T+D
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Apr 30, 2024
1 parent 69a7a7a commit 87847f9
Show file tree
Hide file tree
Showing 16 changed files with 885 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,34 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
dockerImageTag: 0.3.1
dockerImageTag: 1.0.0
dockerRepository: airbyte/destination-mysql-strict-encrypt
githubIssueLabel: destination-mysql
icon: mysql.svg
license: ELv2
name: MySQL
normalizationConfig:
normalizationIntegrationType: mysql
normalizationRepository: airbyte/normalization-mysql
normalizationTag: 0.4.1
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/destinations/mysql
supportsDbt: true
tags:
- language:java
releases:
breakingChanges:
1.0.0:
message:
"**Do not upgrade until you have run a test upgrade as outlined [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#testing-destinations-v2-for-a-single-connection)**.
This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2),
which provides better error handling, incremental delivery of data for large
syncs, and improved final table structures. To review the breaking changes,
and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading).
These changes will likely require updates to downstream dbt / SQL models,
which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations).
Selecting `Upgrade` will upgrade **all** connections using this destination
at their next sync. You can manually sync existing connections prior to
the next scheduled sync to start the upgrade early.
"
upgradeDeadline: "2024-05-15"
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"supportsIncremental": true,
"supportsNormalization": false,
"supportsDBT": true,
"supported_destination_sync_modes": ["overwrite", "append"],
"supported_destination_sync_modes": ["overwrite", "append", "append_dedup"],
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MySQL Destination Spec",
Expand Down Expand Up @@ -171,6 +171,13 @@
"description": "The database to write raw tables into",
"title": "Raw table database (defaults to airbyte_internal)",
"order": 7
},
"disable_type_dedupe": {
"type": "boolean",
"default": false,
"description": "Disable Writing Final Tables. WARNING! The data format in _airbyte_data is likely stable but there are no guarantees that other metadata columns will remain the same in future versions",
"title": "Disable Final Tables. (WARNING! Unstable option; Columns in raw table schema might change between versions)",
"order": 8
}
}
}
Expand Down
20 changes: 17 additions & 3 deletions airbyte-integrations/connectors/destination-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
dockerImageTag: 0.3.1
dockerImageTag: 1.0.0
dockerRepository: airbyte/destination-mysql
githubIssueLabel: destination-mysql
icon: mysql.svg
license: ELv2
name: MySQL
registries:
cloud:
dockerImageTag: 0.2.0
dockerRepository: airbyte/destination-mysql-strict-encrypt
enabled: true
oss:
dockerImageTag: 0.2.0
enabled: true
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/destinations/mysql
Expand All @@ -25,4 +23,20 @@ data:
sl: 100
ql: 200
supportLevel: community
releases:
breakingChanges:
1.0.0:
message:
"**Do not upgrade until you have run a test upgrade as outlined [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#testing-destinations-v2-for-a-single-connection)**.
This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2),
which provides better error handling and improved final table structures. To review the breaking changes,
and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading).
These changes will likely require updates to downstream dbt / SQL models,
which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations).
Selecting `Upgrade` will upgrade **all** connections using this destination
at their next sync. You can manually sync existing connections prior to
the next scheduled sync to start the upgrade early."
upgradeDeadline: "2024-06-05"
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,26 @@
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.NoOpJdbcDestinationHandler;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationV1V2Migrator;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility;
import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlDestinationHandler;
import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlSqlGenerator;
import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlV1V2Migrator;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.jetbrains.annotations.NotNull;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -63,6 +64,12 @@ public class MySQLDestination extends AbstractJdbcDestination<MinimumDestination
"verifyServerCertificate", "false"),
DEFAULT_JDBC_PARAMETERS);

@Override
@NotNull
protected String getConfigSchemaKey() {
return JdbcUtils.DATABASE_KEY;
}

public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new MySQLDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}
Expand Down Expand Up @@ -123,10 +130,9 @@ protected Map<String, String> getDefaultConnectionProperties(final JsonNode conf

@Override
public JsonNode toJdbcConfig(final JsonNode config) {
final String jdbcUrl = String.format("jdbc:mysql://%s:%s/%s",
final String jdbcUrl = String.format("jdbc:mysql://%s:%s",
config.get(JdbcUtils.HOST_KEY).asText(),
config.get(JdbcUtils.PORT_KEY).asText(),
config.get(JdbcUtils.DATABASE_KEY).asText());
config.get(JdbcUtils.PORT_KEY).asText());

final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put(JdbcUtils.USERNAME_KEY, config.get(JdbcUtils.USERNAME_KEY).asText())
Expand All @@ -147,18 +153,35 @@ protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
return new MysqlSqlGenerator();
}

@NotNull
@Override
protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) {
return new PropertyNameSimplifyingDataTransformer();
protected JdbcDestinationHandler<MinimumDestinationState> getDestinationHandler(@NotNull String databaseName,
@NotNull JdbcDatabase database,
@NotNull String rawTableSchema) {
return new MysqlDestinationHandler(database, rawTableSchema);
}

@NotNull
@Override
protected List<Migration<MinimumDestinationState>> getMigrations(@NotNull JdbcDatabase database,
@NotNull String databaseName,
@NotNull SqlGenerator sqlGenerator,
@NotNull DestinationHandler<MinimumDestinationState> destinationHandler) {
return Collections.emptyList();
}

@Override
public boolean isV2Destination() {
return true;
protected DestinationV1V2Migrator getV1V2Migrator(JdbcDatabase database, String databaseName) {
return new MysqlV1V2Migrator(database);
}

@Override
protected boolean shouldAlwaysDisableTypeDedupe() {
protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) {
return new PropertyNameSimplifyingDataTransformer();
}

@Override
public boolean isV2Destination() {
return true;
}

Expand All @@ -169,21 +192,4 @@ public static void main(final String[] args) throws Exception {
LOGGER.info("completed destination: {}", MySQLDestination.class);
}

@NotNull
@Override
protected JdbcDestinationHandler<MinimumDestinationState> getDestinationHandler(@NotNull String databaseName,
@NotNull JdbcDatabase database,
@NotNull String rawTableSchema) {
return new NoOpJdbcDestinationHandler<>(databaseName, database, rawTableSchema, SQLDialect.DEFAULT);
}

@NotNull
@Override
protected List<Migration<MinimumDestinationState>> getMigrations(@NotNull JdbcDatabase database,
@NotNull String databaseName,
@NotNull SqlGenerator sqlGenerator,
@NotNull DestinationHandler<MinimumDestinationState> destinationHandler) {
return Collections.emptyList();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mysql.typing_deduping

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
import io.airbyte.integrations.base.destination.typing_deduping.Array
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.base.destination.typing_deduping.Struct
import io.airbyte.integrations.base.destination.typing_deduping.Union
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import java.sql.DatabaseMetaData
import java.sql.ResultSet
import java.util.Optional
import org.jooq.DataType
import org.jooq.SQLDialect
import org.jooq.impl.DefaultDataType

class MysqlDestinationHandler(
jdbcDatabase: JdbcDatabase,
rawTableDatabaseName: String,
) :
JdbcDestinationHandler<MinimumDestinationState>(
// Mysql doesn't have an actual schema concept.
// Instead, we put each namespace into its own database.
null,
jdbcDatabase,
rawTableDatabaseName,
SQLDialect.MYSQL,
) {
override val stateTableUpdatedAtType: DataType<*> =
DefaultDataType(SQLDialect.MYSQL, String::class.java, "datetime")
override fun toJdbcTypeName(airbyteType: AirbyteType): String =
// This is mostly identical to the postgres implementation, but swaps jsonb to json
if (airbyteType is AirbyteProtocolType) {
Companion.toJdbcTypeName(airbyteType)
} else {
when (airbyteType.typeName) {
Struct.TYPE,
UnsupportedOneOf.TYPE,
Array.TYPE -> "json"
Union.TYPE -> toJdbcTypeName((airbyteType as Union).chooseType())
else -> throw IllegalArgumentException("Unsupported AirbyteType: $airbyteType")
}
}

override fun isAirbyteRawIdColumnMatch(existingTable: TableDefinition): Boolean =
// we create the raw_id column as a varchar rather than as text
"VARCHAR" == existingTable.columns[JavaBaseConstants.COLUMN_NAME_AB_RAW_ID]!!.type

override fun isAirbyteExtractedAtColumnMatch(existingTable: TableDefinition): Boolean =
// the raw table uses a real timestamp column for backwards-compatibility reasons
"TIMESTAMP" == existingTable.columns[JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT]!!.type

override fun toDestinationState(json: JsonNode): MinimumDestinationState =
MinimumDestinationState.Impl(
json.hasNonNull("needsSoftReset") && json["needsSoftReset"].asBoolean(),
)

// Mysql doesn't have schemas. Pass the namespace as the database name.
override fun findExistingTable(id: StreamId): Optional<TableDefinition> =
findExistingTable(jdbcDatabase, id.finalNamespace, null, id.finalName)

override fun getTableFromMetadata(dbmetadata: DatabaseMetaData, id: StreamId): ResultSet =
dbmetadata.getTables(id.rawNamespace, null, id.rawName, null)

companion object {
private fun toJdbcTypeName(airbyteProtocolType: AirbyteProtocolType): String =
when (airbyteProtocolType) {
AirbyteProtocolType.STRING -> "text"
AirbyteProtocolType.NUMBER -> "decimal"
AirbyteProtocolType.INTEGER -> "bigint"
AirbyteProtocolType.BOOLEAN -> "bit"
AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE -> "varchar"
AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE -> "datetime"
AirbyteProtocolType.TIME_WITH_TIMEZONE -> "varchar"
AirbyteProtocolType.TIME_WITHOUT_TIMEZONE -> "time"
AirbyteProtocolType.DATE -> "date"
AirbyteProtocolType.UNKNOWN -> "json"
}
}
}
Loading

0 comments on commit 87847f9

Please sign in to comment.