diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index 452a4f05ae8b..88aa8e9afdea 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -193,8 +193,10 @@ abstract class BasicFunctionalityIntegrationTest( configUpdater = configUpdater, envVars = envVars, ) { - val parsedConfig = - ValidatedJsonUtils.parseOne(configSpecClass, configUpdater.update(configContents)) + + // Update config with any replacements. This may be necessary when using testcontainers. + val configAsString = configUpdater.update(configContents) + val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, configAsString) @Test open fun testBasicWrite() { @@ -209,7 +211,7 @@ abstract class BasicFunctionalityIntegrationTest( ) val messages = runSync( - configContents, + configAsString, stream, listOf( InputRecord( @@ -260,7 +262,7 @@ abstract class BasicFunctionalityIntegrationTest( { if (verifyDataWriting) { dumpAndDiffRecords( - ValidatedJsonUtils.parseOne(configSpecClass, configContents), + ValidatedJsonUtils.parseOne(configSpecClass, configAsString), listOf( OutputRecord( extractedAt = 1234, @@ -321,7 +323,7 @@ abstract class BasicFunctionalityIntegrationTest( val messages = runSync( - configContents, + configAsString, stream, listOf( InputFile( @@ -359,7 +361,7 @@ abstract class BasicFunctionalityIntegrationTest( ) }) - val config = ValidatedJsonUtils.parseOne(configSpecClass, configContents) + val config = ValidatedJsonUtils.parseOne(configSpecClass, configAsString) val fileContent = dataDumper.dumpFile(config, stream) assertEquals(listOf("123"), fileContent) @@ -380,7 +382,7 @@ abstract class BasicFunctionalityIntegrationTest( ) val stateMessage = runSyncUntilStateAck( - configContents, + this@BasicFunctionalityIntegrationTest.configContents, stream, listOf( InputRecord( @@ -398,7 +400,7 @@ abstract class BasicFunctionalityIntegrationTest( ), allowGracefulShutdown = false, ) - runSync(configContents, stream, emptyList()) + runSync(this@BasicFunctionalityIntegrationTest.configContents, stream, emptyList()) val streamName = stateMessage.stream.streamDescriptor.name val streamNamespace = stateMessage.stream.streamDescriptor.namespace @@ -461,7 +463,7 @@ abstract class BasicFunctionalityIntegrationTest( val stream1 = makeStream(randomizedNamespace + "_1") val stream2 = makeStream(randomizedNamespace + "_2") runSync( - configContents, + configAsString, DestinationCatalog( listOf( stream1, @@ -584,7 +586,7 @@ abstract class BasicFunctionalityIntegrationTest( serialized = "", ) } - runSync(configContents, catalog, messages) + runSync(configAsString, catalog, messages) assertAll( catalog.streams.map { stream -> { @@ -624,7 +626,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId, ) runSync( - configContents, + configAsString, makeStream(generationId = 12, minimumGenerationId = 0, syncId = 42), listOf( InputRecord( @@ -637,7 +639,7 @@ abstract class BasicFunctionalityIntegrationTest( ) val finalStream = makeStream(generationId = 13, minimumGenerationId = 13, syncId = 43) runSync( - configContents, + configAsString, finalStream, listOf( InputRecord( @@ -719,7 +721,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 41, ) runSync( - configContents, + configAsString, stream1, listOf( makeInputRecord(1, "2024-01-23T01:00:00Z", 100), @@ -758,7 +760,7 @@ abstract class BasicFunctionalityIntegrationTest( ) // Run a sync, but emit a status incomplete. This should not delete any existing data. runSyncUntilStateAck( - configContents, + configAsString, stream2, listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)), StreamCheckpoint( @@ -807,7 +809,7 @@ abstract class BasicFunctionalityIntegrationTest( // Run a third sync, this time with a successful status. // This should delete the first sync's data, and retain the second+third syncs' data. runSync( - configContents, + configAsString, stream2, listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)), ) @@ -888,7 +890,7 @@ abstract class BasicFunctionalityIntegrationTest( ) // Run a sync, but emit a stream status incomplete. runSyncUntilStateAck( - configContents, + configAsString, stream, listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)), StreamCheckpoint( @@ -923,7 +925,7 @@ abstract class BasicFunctionalityIntegrationTest( // Run a second sync, this time with a successful status. // This should retain the first syncs' data. runSync( - configContents, + configAsString, stream, listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)), ) @@ -1009,7 +1011,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 41, ) runSync( - configContents, + configAsString, stream1, listOf( makeInputRecord(1, "2024-01-23T01:00:00Z", 100), @@ -1049,7 +1051,7 @@ abstract class BasicFunctionalityIntegrationTest( // Run a sync, but emit a stream status incomplete. This should not delete any existing // data. runSyncUntilStateAck( - configContents, + configAsString, stream2, listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)), StreamCheckpoint( @@ -1104,7 +1106,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 43, ) runSync( - configContents, + configAsString, stream3, listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)), ) @@ -1164,7 +1166,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId, ) runSync( - configContents, + configAsString, makeStream(syncId = 42), listOf( InputRecord( @@ -1177,7 +1179,7 @@ abstract class BasicFunctionalityIntegrationTest( ) val finalStream = makeStream(syncId = 43) runSync( - configContents, + configAsString, finalStream, listOf( InputRecord( @@ -1230,7 +1232,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId, ) runSync( - configContents, + configAsString, makeStream( syncId = 42, linkedMapOf("id" to intType, "to_drop" to stringType, "to_change" to intType) @@ -1250,7 +1252,7 @@ abstract class BasicFunctionalityIntegrationTest( linkedMapOf("id" to intType, "to_change" to stringType, "to_add" to stringType) ) runSync( - configContents, + configAsString, finalStream, listOf( InputRecord( @@ -1324,7 +1326,7 @@ abstract class BasicFunctionalityIntegrationTest( val sync1Stream = makeStream(syncId = 42) runSync( - configContents, + configAsString, sync1Stream, listOf( // emitted_at:1000 is equal to 1970-01-01 00:00:01Z. @@ -1385,7 +1387,7 @@ abstract class BasicFunctionalityIntegrationTest( val sync2Stream = makeStream(syncId = 43) runSync( - configContents, + configAsString, sync2Stream, listOf( // Update both Alice and Bob @@ -1469,9 +1471,9 @@ abstract class BasicFunctionalityIntegrationTest( // instead of being able to fallback onto extractedAt. emittedAtMs = 100, ) - runSync(configContents, makeStream("cursor1"), listOf(makeRecord("cursor1"))) + runSync(configAsString, makeStream("cursor1"), listOf(makeRecord("cursor1"))) val stream2 = makeStream("cursor2") - runSync(configContents, stream2, listOf(makeRecord("cursor2"))) + runSync(configAsString, stream2, listOf(makeRecord("cursor2"))) dumpAndDiffRecords( parsedConfig, listOf( @@ -1528,7 +1530,7 @@ abstract class BasicFunctionalityIntegrationTest( ) } // Just verify that we don't crash. - assertDoesNotThrow { runSync(configContents, DestinationCatalog(streams), messages) } + assertDoesNotThrow { runSync(configAsString, DestinationCatalog(streams), messages) } } /** @@ -1581,7 +1583,7 @@ abstract class BasicFunctionalityIntegrationTest( emittedAtMs = 100, ) runSync( - configContents, + configAsString, stream, listOf( // A record with valid values for all fields @@ -1865,7 +1867,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 42, ) runSync( - configContents, + configAsString, stream, listOf( InputRecord( @@ -2044,7 +2046,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 42, ) runSync( - configContents, + configAsString, stream, listOf( InputRecord( @@ -2215,7 +2217,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 42, ) runSync( - configContents, + configAsString, stream, listOf( InputRecord( @@ -2407,7 +2409,7 @@ abstract class BasicFunctionalityIntegrationTest( syncId = 42, ) runSync( - configContents, + configAsString, stream, listOf( InputRecord( diff --git a/airbyte-integrations/connectors/destination-mssql-v2/build.gradle.kts b/airbyte-integrations/connectors/destination-mssql-v2/build.gradle.kts index 4cc0467ff7bd..27db447bf6b3 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/build.gradle.kts +++ b/airbyte-integrations/connectors/destination-mssql-v2/build.gradle.kts @@ -22,7 +22,7 @@ application { //applicationDefaultJvmArgs = listOf("-XX:+ExitOnOutOfMemoryError", "-XX:MaxRAMPercentage=75.0", "--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED", "--add-opens", "java.base/sun.security.action=ALL-UNNAMED", "--add-opens", "java.base/java.lang=ALL-UNNAMED") } -val junitVersion = "5.11.3" +val junitVersion = "5.11.4" configurations.configureEach { // Exclude additional SLF4J providers from all classpaths @@ -38,13 +38,16 @@ dependencies { implementation("com.microsoft.sqlserver:mssql-jdbc:12.8.1.jre11") implementation("io.github.oshai:kotlin-logging-jvm:7.0.0") implementation("jakarta.inject:jakarta.inject-api:2.0.1") - implementation("com.github.spotbugs:spotbugs-annotations:4.8.6") - implementation("io.micronaut:micronaut-inject:4.6.1") + implementation("com.github.spotbugs:spotbugs-annotations:4.9.0") + implementation("io.micronaut:micronaut-inject:4.7.12") + implementation("com.zaxxer:HikariCP:6.2.1") - testImplementation("io.mockk:mockk:1.13.13") + testImplementation("io.mockk:mockk:1.13.16") testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion") testImplementation("org.junit.jupiter:junit-jupiter-params:$junitVersion") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:$junitVersion") + + integrationTestImplementation("org.testcontainers:mssqlserver:1.20.4") } tasks.named("test") { diff --git a/airbyte-integrations/connectors/destination-mssql-v2/docker-compose.yml b/airbyte-integrations/connectors/destination-mssql-v2/docker-compose.yml new file mode 100644 index 000000000000..4ba24a979759 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/docker-compose.yml @@ -0,0 +1,8 @@ +services: + sql-server: + image: mcr.microsoft.com/mssql/server:2022-latest + ports: + - "1433:1433" + environment: + - ACCEPT_EULA=Y + - MSSQL_SA_PASSWORD=A_Str0ng_Required_Password diff --git a/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml b/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml index 4ebc86cc3d6d..2eb76eb2ebf3 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml @@ -16,7 +16,7 @@ data: type: GSM connectorType: destination definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c - dockerImageTag: 0.1.2 + dockerImageTag: 0.1.3 dockerRepository: airbyte/destination-mssql-v2 documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql-v2 githubIssueLabel: destination-mssql-v2 diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLChecker.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLChecker.kt new file mode 100644 index 000000000000..5f7b8c2aecb9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLChecker.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2 + +import io.airbyte.cdk.load.check.DestinationChecker +import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration +import io.airbyte.integrations.destination.mssql.v2.config.MSSQLDataSourceFactory +import jakarta.inject.Singleton +import java.util.UUID +import javax.sql.DataSource + +@Singleton +class MSSQLChecker(private val dataSourceFactory: MSSQLDataSourceFactory) : + DestinationChecker { + override fun check(config: MSSQLConfiguration) { + val dataSource: DataSource = dataSourceFactory.getDataSource(config) + val testTableName = "check_test_${UUID.randomUUID()}" + val fullyQualifiedTableName = "[${config.rawDataSchema}].[${testTableName}]" + dataSource.connection.use { connection -> + connection.createStatement().use { statement -> + statement.executeUpdate( + """ + CREATE TABLE ${fullyQualifiedTableName} (test int); + DROP TABLE ${fullyQualifiedTableName}; + """.trimIndent(), + ) + } + } + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLQueryBuilder.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLQueryBuilder.kt new file mode 100644 index 000000000000..86d0d596a808 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLQueryBuilder.kt @@ -0,0 +1,392 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2 + +import com.microsoft.sqlserver.jdbc.SQLServerException +import io.airbyte.cdk.load.command.Append +import io.airbyte.cdk.load.command.Dedupe +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.Overwrite +import io.airbyte.cdk.load.data.AirbyteType +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.FieldType +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration +import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteTypeToSqlType +import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteValueToStatement.Companion.setAsNullValue +import io.airbyte.integrations.destination.mssql.v2.convert.AirbyteValueToStatement.Companion.setValue +import io.airbyte.integrations.destination.mssql.v2.convert.MssqlType +import io.airbyte.integrations.destination.mssql.v2.convert.ResultSetToAirbyteValue.Companion.getAirbyteNamedValue +import io.airbyte.integrations.destination.mssql.v2.convert.SqlTypeToMssqlType +import io.airbyte.protocol.models.Jsons +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange +import java.lang.ArithmeticException +import java.sql.Connection +import java.sql.PreparedStatement +import java.sql.ResultSet +import java.util.UUID + +fun String.executeQuery(connection: Connection, vararg args: String, f: (ResultSet) -> T): T { + connection.prepareStatement(this.trimIndent()).use { statement -> + args.forEachIndexed { index, arg -> statement.setString(index + 1, arg) } + return statement.executeQuery().use(f) + } +} + +fun String.executeUpdate(connection: Connection, vararg args: String) { + connection.prepareStatement(this.trimIndent()).use { statement -> + args.forEachIndexed { index, arg -> statement.setString(index + 1, arg) } + statement.executeUpdate() + } +} + +fun String.toQuery(vararg args: String): String = this.trimIndent().replace("?", "%s").format(*args) + +const val GET_EXISTING_SCHEMA_QUERY = + """ + SELECT COLUMN_NAME, DATA_TYPE + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? + ORDER BY ORDINAL_POSITION ASC + """ + +const val CREATE_SCHEMA_QUERY = + """ + DECLARE @Schema VARCHAR(MAX) = ? + IF NOT EXISTS (SELECT name FROM sys.schemas WHERE name = @Schema) + BEGIN + EXEC ('CREATE SCHEMA ' + @Schema); + END + """ + +const val ALTER_TABLE_ADD = """ + ALTER TABLE ? + ADD ? ? NULL; + """ +const val ALTER_TABLE_DROP = """ + ALTER TABLE ? + DROP COLUMN ?; + """ +const val ALTER_TABLE_MODIFY = """ + ALTER TABLE ? + ALTER COLUMN ? ? NULL; + """ + +const val DELETE_WHERE_COL_IS_NOT_NULL = + """ + DELETE FROM ? + WHERE ? is not NULL + """ + +const val DELETE_WHERE_COL_LESS_THAN = """ + DELETE FROM ? + WHERE ? < ? + """ + +const val SELECT_FROM = """ + SELECT * + FROM ? + """ + +class MSSQLQueryBuilder( + config: MSSQLConfiguration, + private val stream: DestinationStream, +) { + companion object { + + const val SQL_ERROR_OBJECT_EXISTS = 2714 + + const val AIRBYTE_RAW_ID = "_airbyte_raw_id" + const val AIRBYTE_EXTRACTED_AT = "_airbyte_extracted_at" + const val AIRBYTE_META = "_airbyte_meta" + const val AIRBYTE_GENERATION_ID = "_airbyte_generation_id" + const val AIRBYTE_CDC_DELETED_AT = "_ab_cdc_deleted_at" + const val DEFAULT_SEPARATOR = ",\n " + + val airbyteFinalTableFields = + listOf( + NamedField(AIRBYTE_RAW_ID, FieldType(StringType, false)), + NamedField(AIRBYTE_EXTRACTED_AT, FieldType(IntegerType, false)), + NamedField(AIRBYTE_META, FieldType(ObjectTypeWithoutSchema, false)), + NamedField(AIRBYTE_GENERATION_ID, FieldType(IntegerType, false)), + ) + + val airbyteFields = airbyteFinalTableFields.map { it.name }.toSet() + + private fun AirbyteRecordMessageMeta.trackChange( + fieldName: String, + change: AirbyteRecordMessageMetaChange.Change, + reason: AirbyteRecordMessageMetaChange.Reason, + ) { + this.changes.add( + AirbyteRecordMessageMetaChange() + .withField(fieldName) + .withChange(change) + .withReason(reason) + ) + } + } + + data class NamedField(val name: String, val type: FieldType) + data class NamedValue(val name: String, val value: AirbyteValue) + data class NamedSqlField(val name: String, val type: MssqlType) + + private val outputSchema: String = stream.descriptor.namespace ?: config.schema + private val tableName: String = stream.descriptor.name + val fqTableName = "$outputSchema.$tableName" + private val uniquenessKey: List = + when (stream.importType) { + is Dedupe -> + if ((stream.importType as Dedupe).primaryKey.isNotEmpty()) { + (stream.importType as Dedupe).primaryKey.map { it.joinToString(".") } + } else { + listOf((stream.importType as Dedupe).cursor.joinToString(".")) + } + Append -> emptyList() + Overwrite -> emptyList() + } + + private val toSqlType = AirbyteTypeToSqlType() + private val toMssqlType = SqlTypeToMssqlType() + + val finalTableSchema: List = + airbyteFinalTableFields + extractFinalTableSchema(stream.schema) + val hasCdc: Boolean = finalTableSchema.any { it.name == AIRBYTE_CDC_DELETED_AT } + + private fun getExistingSchema(connection: Connection): List { + val fields = mutableListOf() + GET_EXISTING_SCHEMA_QUERY.executeQuery(connection, outputSchema, tableName) { rs -> + while (rs.next()) { + val name = rs.getString("COLUMN_NAME") + val type = MssqlType.valueOf(rs.getString("DATA_TYPE").uppercase()) + fields.add(NamedSqlField(name, type)) + } + } + return fields + } + + private fun getSchema(): List = + finalTableSchema.map { + NamedSqlField(it.name, toMssqlType.convert(toSqlType.convert(it.type.type))) + } + + fun updateSchema(connection: Connection) { + val existingSchema = getExistingSchema(connection) + val expectedSchema = getSchema() + + val existingFields = existingSchema.associate { it.name to it.type } + val expectedFields = expectedSchema.associate { it.name to it.type } + + if (existingFields == expectedFields) { + return + } + + val toDelete = existingFields.filter { it.key !in expectedFields } + val toAdd = expectedFields.filter { it.key !in existingFields } + val toAlter = + expectedFields.filter { it.key in existingFields && it.value != existingFields[it.key] } + + val query = + StringBuilder() + .apply { + toDelete.entries.forEach { + appendLine(ALTER_TABLE_DROP.toQuery(fqTableName, it.key)) + } + toAdd.entries.forEach { + appendLine(ALTER_TABLE_ADD.toQuery(fqTableName, it.key, it.value.sqlString)) + } + toAlter.entries.forEach { + appendLine( + ALTER_TABLE_MODIFY.toQuery(fqTableName, it.key, it.value.sqlString) + ) + } + } + .toString() + + query.executeUpdate(connection) + } + + fun createTableIfNotExists(connection: Connection) { + try { + CREATE_SCHEMA_QUERY.executeUpdate(connection, outputSchema) + } catch (e: SQLServerException) { + // MSSQL create schema if not exists isn't atomic. Ignoring this error when it happens. + if (e.sqlServerError.errorNumber != SQL_ERROR_OBJECT_EXISTS) { + throw e + } + } + + connection.createStatement().use { + it.executeUpdate(createTableIfNotExists(fqTableName, finalTableSchema)) + } + } + + fun getFinalTableInsertColumnHeader(): String = + getFinalTableInsertColumnHeader(fqTableName, finalTableSchema) + + fun deleteCdc(connection: Connection) = + DELETE_WHERE_COL_IS_NOT_NULL.toQuery(fqTableName, AIRBYTE_CDC_DELETED_AT) + .executeUpdate(connection) + + fun deletePreviousGenerations(connection: Connection, minGenerationId: Long) = + DELETE_WHERE_COL_LESS_THAN.toQuery( + fqTableName, + AIRBYTE_GENERATION_ID, + minGenerationId.toString() + ) + .executeUpdate(connection) + + fun populateStatement( + statement: PreparedStatement, + record: DestinationRecordAirbyteValue, + schema: List + ) { + val recordObject = record.data as ObjectValue + var airbyteMetaStatementIndex: Int? = null + val airbyteMeta = + AirbyteRecordMessageMeta().apply { + changes = + record.meta?.changes?.map { it.asProtocolObject() }?.toMutableList() + ?: mutableListOf() + setAdditionalProperty("syncId", stream.syncId) + } + + schema.forEachIndexed { index, field -> + val statementIndex = index + 1 + if (field.name in airbyteFields) { + when (field.name) { + AIRBYTE_RAW_ID -> + statement.setString(statementIndex, UUID.randomUUID().toString()) + AIRBYTE_EXTRACTED_AT -> statement.setLong(statementIndex, record.emittedAtMs) + AIRBYTE_GENERATION_ID -> statement.setLong(statementIndex, stream.generationId) + AIRBYTE_META -> airbyteMetaStatementIndex = statementIndex + } + } else { + try { + val value = recordObject.values[field.name] + statement.setValue(statementIndex, value, field) + } catch (e: Exception) { + statement.setAsNullValue(statementIndex, field.type.type) + when (e) { + is ArithmeticException -> + airbyteMeta.trackChange( + field.name, + AirbyteRecordMessageMetaChange.Change.NULLED, + AirbyteRecordMessageMetaChange.Reason + .DESTINATION_FIELD_SIZE_LIMITATION, + ) + else -> + airbyteMeta.trackChange( + field.name, + AirbyteRecordMessageMetaChange.Change.NULLED, + AirbyteRecordMessageMetaChange.Reason + .DESTINATION_SERIALIZATION_ERROR, + ) + } + } + } + } + airbyteMetaStatementIndex?.let { statementIndex -> + if (airbyteMeta.changes.isEmpty()) { + airbyteMeta.changes = null + } + statement.setString(statementIndex, Jsons.serialize(airbyteMeta)) + } + } + + fun readResult(rs: ResultSet, schema: List): ObjectValue { + val valueMap = + schema + .filter { field -> field.name !in airbyteFields } + .map { field -> rs.getAirbyteNamedValue(field) } + .associate { it.name to it.value } + return ObjectValue.from(valueMap) + } + + private fun createTableIfNotExists(fqTableName: String, schema: List): String { + val index = + if (uniquenessKey.isNotEmpty()) + createIndex(fqTableName, uniquenessKey, clustered = false) + else "" + val cdcIndex = if (hasCdc) createIndex(fqTableName, listOf(AIRBYTE_CDC_DELETED_AT)) else "" + + return """ + IF OBJECT_ID('$fqTableName') IS NULL + BEGIN + CREATE TABLE $fqTableName + ( + ${airbyteTypeToSqlSchema(schema)} + ); + $index; + $cdcIndex; + END + """.trimIndent() + } + + private fun createIndex( + fqTableName: String, + columns: List, + clustered: Boolean = false + ): String { + val name = "${fqTableName.replace('.', '_')}_${columns.hashCode()}" + val indexType = if (clustered) "CLUSTERED" else "" + return "CREATE $indexType INDEX $name ON $fqTableName (${columns.joinToString(", ")})" + } + + private fun getFinalTableInsertColumnHeader( + fqTableName: String, + schema: List + ): String { + val columns = schema.joinToString(", ") { it.name } + val templateColumns = schema.joinToString(", ") { "?" } + return if (uniquenessKey.isEmpty()) { + """ + INSERT INTO $fqTableName ($columns) + SELECT table_value.* + FROM (VALUES ($templateColumns)) table_value($columns) + """ + } else { + val uniquenessConstraint = + uniquenessKey.joinToString(" AND ") { "Target.$it = Source.$it" } + val updateStatement = schema.joinToString(", ") { "${it.name} = Source.${it.name}" } + """ + MERGE INTO $fqTableName AS Target + USING (VALUES ($templateColumns)) AS Source ($columns) + ON $uniquenessConstraint + WHEN MATCHED THEN + UPDATE SET $updateStatement + WHEN NOT MATCHED BY TARGET THEN + INSERT ($columns) VALUES ($columns) + ; + """.trimIndent() + } + } + + private fun extractFinalTableSchema(schema: AirbyteType): List = + when (schema) { + is ObjectType -> { + (stream.schema as ObjectType) + .properties + .map { NamedField(name = it.key, type = it.value) } + .toList() + } + else -> TODO("most likely fail hard") + } + + private fun airbyteTypeToSqlSchema( + schema: List, + separator: String = DEFAULT_SEPARATOR + ): String { + return schema.joinToString(separator = separator) { + "${it.name} ${toMssqlType.convert(toSqlType.convert(it.type.type)).sqlString} NULL" + } + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLStreamLoader.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLStreamLoader.kt new file mode 100644 index 000000000000..9e46466c2b82 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLStreamLoader.kt @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2 + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.airbyte.cdk.load.message.SimpleBatch +import io.airbyte.cdk.load.state.StreamProcessingFailed +import io.airbyte.cdk.load.write.StreamLoader +import io.github.oshai.kotlinlogging.KotlinLogging +import javax.sql.DataSource + +private val log = KotlinLogging.logger {} + +class MSSQLStreamLoader( + private val dataSource: DataSource, + override val stream: DestinationStream, + private val sqlBuilder: MSSQLQueryBuilder, +) : StreamLoader { + + override suspend fun start() { + ensureTableExists(dataSource) + } + + override suspend fun close(streamFailure: StreamProcessingFailed?) { + if (streamFailure == null) { + truncatePreviousGenerations(dataSource) + } + super.close(streamFailure) + } + + override suspend fun processRecords( + records: Iterator, + totalSizeBytes: Long, + endOfStream: Boolean + ): Batch { + dataSource.connection.use { connection -> + val statement = + connection.prepareStatement(sqlBuilder.getFinalTableInsertColumnHeader()) + records.forEach { record -> + sqlBuilder.populateStatement(statement, record, sqlBuilder.finalTableSchema) + statement.addBatch() + } + statement.executeLargeBatch() + if (sqlBuilder.hasCdc) { + sqlBuilder.deleteCdc(connection) + } + connection.commit() + } + return SimpleBatch(Batch.State.COMPLETE) + } + + private fun ensureTableExists(dataSource: DataSource) { + try { + dataSource.connection.use { connection -> + sqlBuilder.createTableIfNotExists(connection) + sqlBuilder.updateSchema(connection) + } + } catch (ex: Exception) { + log.error(ex) { ex.message } + throw ex + } + } + + private fun truncatePreviousGenerations(dataSource: DataSource) { + // TODO this can be improved to avoid attempting to truncate the data for each sync + dataSource.connection.use { connection -> + sqlBuilder.deletePreviousGenerations(connection, stream.minimumGenerationId) + } + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriter.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriter.kt new file mode 100644 index 000000000000..7fdf37e5cf75 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriter.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2 + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.state.DestinationFailure +import io.airbyte.cdk.load.write.DestinationWriter +import io.airbyte.cdk.load.write.StreamLoader +import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration +import io.airbyte.integrations.destination.mssql.v2.config.MSSQLDataSourceFactory +import jakarta.inject.Singleton +import javax.sql.DataSource + +@Singleton +class MSSQLWriter( + private val config: MSSQLConfiguration, + private val dataSourceFactory: MSSQLDataSourceFactory +) : DestinationWriter { + private var dataSource: DataSource? = null + + override fun createStreamLoader(stream: DestinationStream): StreamLoader { + val sqlBuilder = MSSQLQueryBuilder(config, stream) + return MSSQLStreamLoader( + dataSource = dataSource + ?: throw IllegalStateException("dataSource hasn't been initialized"), + stream = stream, + sqlBuilder = sqlBuilder, + ) + } + + override suspend fun setup() { + super.setup() + dataSource = dataSourceFactory.getDataSource(config) + } + + override suspend fun teardown(destinationFailure: DestinationFailure?) { + dataSource?.let { dataSourceFactory.disposeDataSource(it) } + super.teardown(destinationFailure) + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/DataSourceFactory.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/DataSourceFactory.kt new file mode 100644 index 000000000000..216f0773157d --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/DataSourceFactory.kt @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.config + +import com.microsoft.sqlserver.jdbc.SQLServerDataSource +import com.zaxxer.hikari.HikariDataSource +import io.micronaut.context.ApplicationContext +import io.micronaut.context.annotation.Factory +import jakarta.inject.Singleton +import javax.sql.DataSource + +@Factory +class DataSourceFactory { + + @Singleton + fun dataSource(config: MSSQLConfiguration): DataSource { + val sqlServerDataSource = config.toSQLServerDataSource() + val dataSource = HikariDataSource() + dataSource.dataSource = sqlServerDataSource + dataSource.connectionTimeout = 30000 + dataSource.connectionTestQuery = "SELECT 1" + dataSource.maximumPoolSize = 10 + dataSource.minimumIdle = 0 + dataSource.idleTimeout = 60000 + dataSource.leakDetectionThreshold = dataSource.connectionTimeout + 10000 + return dataSource + } +} + +fun MSSQLConfiguration.toSQLServerDataSource(): SQLServerDataSource { + val connectionString = + StringBuilder() + .apply { + append("jdbc:sqlserver://${host}:${port};databaseName=${database}") + + when (sslMethod) { + is EncryptedVerify -> { + append(";encrypt=true") + sslMethod.trustStoreName?.let { append(";trustStoreName=$it") } + sslMethod.trustStorePassword?.let { append(";trustStorePassword=$it") } + sslMethod.hostNameInCertificate?.let { + append(";hostNameInCertificate=$it") + } + } + is EncryptedTrust -> { + append(";encrypt=true;trustServerCertificate=true") + } + is Unencrypted -> { + append(";encrypt=false") + } + } + + jdbcUrlParams?.let { append(";$it") } + } + .toString() + + return SQLServerDataSource().also { + it.url = connectionString + it.user = user + password?.let(it::setPassword) + } +} + +// Indirection to abstract the fact that we are leveraging micronaut to manage the datasource +// and avoid clients interacting directly with the application context to retrieve a datasource. +@Singleton +class MSSQLDataSourceFactory(private val applicationContext: ApplicationContext) { + fun getDataSource(config: MSSQLConfiguration): DataSource = + applicationContext.createBean(DataSource::class.java, config) + + fun disposeDataSource(dataSource: DataSource) { + applicationContext.destroyBean(dataSource) + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt index ca468c08caa7..14b59856d7b8 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt @@ -4,18 +4,45 @@ package io.airbyte.integrations.destination.mssql.v2.config -import dagger.Component.Factory import io.airbyte.cdk.load.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationConfigurationFactory +import io.micronaut.context.annotation.Factory import jakarta.inject.Singleton -data class MSSQLConfiguration(val placeholder: String) : DestinationConfiguration() +data class MSSQLConfiguration( + val host: String, + val port: Int, + val database: String, + val schema: String, + val user: String?, + val password: String?, + val jdbcUrlParams: String?, + val rawDataSchema: String, + val sslMethod: EncryptionMethod, +) : DestinationConfiguration() @Singleton class MSSQLConfigurationFactory : DestinationConfigurationFactory { override fun makeWithoutExceptionHandling(pojo: MSSQLSpecification): MSSQLConfiguration { - TODO("Not yet implemented") + return makeWithOverrides(spec = pojo) + } + + fun makeWithOverrides( + spec: MSSQLSpecification, + overrides: Map = emptyMap() + ): MSSQLConfiguration { + return MSSQLConfiguration( + host = overrides.getOrDefault("host", spec.host), + port = overrides.getOrDefault("port", spec.port.toString()).toInt(), + database = overrides.getOrDefault("database", spec.database), + schema = overrides.getOrDefault("schema", spec.schema), + user = overrides.getOrDefault("user", spec.user), + password = overrides.getOrDefault("password", spec.password), + jdbcUrlParams = overrides.getOrDefault("jdbcUrlParams", spec.jdbcUrlParams), + rawDataSchema = overrides.getOrDefault("rawDataSchema", spec.rawDataSchema), + sslMethod = spec.sslMethod, + ) } } diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLSpecification.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLSpecification.kt index 30a7f9769c63..ee23950dec85 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLSpecification.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLSpecification.kt @@ -4,7 +4,14 @@ package io.airbyte.integrations.destination.mssql.v2.config +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.annotation.JsonPropertyDescription +import com.fasterxml.jackson.annotation.JsonSubTypes +import com.fasterxml.jackson.annotation.JsonTypeInfo +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaDescription +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.load.spec.DestinationSpecificationExtension import io.airbyte.protocol.models.v0.DestinationSyncMode @@ -12,10 +19,129 @@ import jakarta.inject.Singleton @Singleton @JsonSchemaTitle("MSSQL V2 Destination Specification") -class MSSQLSpecification : ConfigurationSpecification() {} +@SuppressFBWarnings("NP_NONNULL_RETURN_VIOLATION") +class MSSQLSpecification : ConfigurationSpecification() { + @get:JsonSchemaTitle("Host") + @get:JsonPropertyDescription("The host name of the MSSQL database.") + @get:JsonProperty("host") + @get:JsonSchemaInject(json = """{"order":0}""") + val host: String = "" + + @get:JsonSchemaTitle("Port") + @get:JsonPropertyDescription("The port of the MSSQL database.") + @get:JsonProperty("port") + @get:JsonSchemaInject(json = """{"minimum":0,"maximum":65536,"examples":["1433"],"order":1}""") + val port: Int = 1433 + + @get:JsonSchemaTitle("DB Name") + @get:JsonPropertyDescription("The name of the MSSQL database.") + @get:JsonProperty("database") + @get:JsonSchemaInject(json = """{"order":2}""") + val database: String = "" + + @get:JsonSchemaTitle("Default Schema") + @get:JsonPropertyDescription( + "The default schema tables are written to if the source does not specify a namespace. The usual value for this field is \"public\"." + ) + @get:JsonProperty("schema") + @get:JsonSchemaInject(json = """{"examples":["public"],"default":"public","order":3}""") + val schema: String = "public" + + @get:JsonSchemaTitle("User") + @get:JsonPropertyDescription("The username which is used to access the database.") + @get:JsonProperty("user") + @get:JsonSchemaInject(json = """{"order":4}""") + val user: String? = null + + @get:JsonSchemaTitle("Password") + @get:JsonPropertyDescription("The password associated with this username.") + @get:JsonProperty("password") + @get:JsonSchemaInject(json = """{"airbyte_secret":true,"order":5}""") + val password: String? = null + + @get:JsonSchemaTitle("JDBC URL Params") + @get:JsonPropertyDescription( + "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3)." + ) + @get:JsonProperty("jdbc_url_params") + @get:JsonSchemaInject(json = """{"order":6}""") + val jdbcUrlParams: String? = null + + @get:JsonSchemaTitle("Raw Table Schema Name") + @get:JsonPropertyDescription("The schema to write raw tables into (default: airbyte_internal)") + @get:JsonProperty("raw_data_schema") + @get:JsonSchemaInject(json = """{"default":"airbyte_internal","order":5}""") + val rawDataSchema: String = "airbyte_internal" + + @get:JsonSchemaTitle("SSL Method") + @get:JsonPropertyDescription( + "The encryption method which is used to communicate with the database." + ) + @get:JsonProperty("ssl_method") + @get:JsonSchemaInject(json = """{"order":8}""") + lateinit var sslMethod: EncryptionMethod +} + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "name") +@JsonSubTypes( + JsonSubTypes.Type(value = Unencrypted::class, name = Unencrypted.NAME), + JsonSubTypes.Type(value = EncryptedTrust::class, name = EncryptedTrust.NAME), + JsonSubTypes.Type(value = EncryptedVerify::class, name = EncryptedVerify.NAME), +) +sealed interface EncryptionMethod { + @get:JsonProperty("name") val name: String +} + +@JsonSchemaTitle("Unencrypted") +@JsonSchemaDescription("The data transfer will not be encrypted.") +class Unencrypted : EncryptionMethod { + companion object { + const val NAME = "unencrypted" + } + override val name: String = NAME +} + +@JsonSchemaTitle("Encrypted (trust server certificate)") +@JsonSchemaDescription( + "Use the certificate provided by the server without verification. (For testing purposes only!)" +) +class EncryptedTrust : EncryptionMethod { + companion object { + const val NAME = "encrypted_trust_server_certificate" + } + override val name: String = NAME +} + +@JsonSchemaTitle("Encrypted (verify certificate)") +@JsonSchemaDescription("Verify and use the certificate provided by the server.") +class EncryptedVerify( + @get:JsonSchemaTitle("Trust Store Name") + @get:JsonPropertyDescription("Specifies the name of the trust store.") + @get:JsonProperty("trustStoreName") + @get:JsonSchemaInject(json = """{"order":1}""") + val trustStoreName: String? = null, + @get:JsonSchemaTitle("Trust Store Password") + @get:JsonPropertyDescription("Specifies the password of the trust store.") + @get:JsonProperty("trustStorePassword") + @get:JsonSchemaInject(json = """{"airbyte_secret":true,"order":2}""") + val trustStorePassword: String? = null, + @get:JsonSchemaTitle("Host Name In Certificate") + @get:JsonPropertyDescription( + "Specifies the host name of the server. The value of this property must match the subject property of the certificate." + ) + @get:JsonProperty("hostNameInCertificate") + @get:JsonSchemaInject(json = """{"order":3}""") + val hostNameInCertificate: String? = null, +) : EncryptionMethod { + companion object { + const val NAME = "encrypted_verify_certificate" + } + override val name: String = NAME +} @Singleton class MSSQLSpecificationExtension : DestinationSpecificationExtension { + override val supportedSyncModes = listOf( DestinationSyncMode.OVERWRITE, diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlType.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlType.kt new file mode 100644 index 000000000000..e09ae2b86d10 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlType.kt @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.convert + +import io.airbyte.cdk.load.data.AirbyteType +import io.airbyte.cdk.load.data.ArrayType +import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema +import io.airbyte.cdk.load.data.BooleanType +import io.airbyte.cdk.load.data.DateType +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.NumberType +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.TimeTypeWithTimezone +import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone +import io.airbyte.cdk.load.data.UnionType +import io.airbyte.cdk.load.data.UnknownType +import io.airbyte.integrations.destination.mssql.v2.model.SqlColumn +import io.airbyte.integrations.destination.mssql.v2.model.SqlTable +import java.sql.Types + +/** CDK pipeline [AirbyteType] to SQL [Types] converter. */ +class AirbyteTypeToSqlType { + + /** + * Converts an [AirbyteType] to the associated SQL [Types] value. + * + * @param airbyteSchema The stream's Airbyte schema, represented as an [AirbyteType] + * @return The associated SQL [Types] value. + * @throws IllegalArgumentException if the [AirbyteType] is not supported. + */ + fun convert(airbyteSchema: AirbyteType): Int { + return when (airbyteSchema) { + is ObjectType -> Types.LONGVARCHAR + is ArrayType -> Types.LONGVARCHAR + is ArrayTypeWithoutSchema -> Types.LONGVARCHAR + is BooleanType -> Types.BOOLEAN + is DateType -> Types.DATE + is IntegerType -> Types.BIGINT + is NumberType -> Types.DECIMAL + is ObjectTypeWithEmptySchema -> Types.LONGVARCHAR + is ObjectTypeWithoutSchema -> Types.LONGVARCHAR + is StringType -> Types.VARCHAR + is TimeTypeWithTimezone -> Types.TIME_WITH_TIMEZONE + is TimeTypeWithoutTimezone -> Types.TIME + is TimestampTypeWithTimezone -> Types.TIMESTAMP_WITH_TIMEZONE + is TimestampTypeWithoutTimezone -> Types.TIMESTAMP + is UnionType -> Types.LONGVARCHAR + is UnknownType -> Types.LONGVARCHAR + } + } +} + +/** + * Extension function that converts an [ObjectType] into a [SqlTable] that can be used to define a + * SQL table. + * + * @param primaryKeys The list of configured primary key properties that should be treated as + * primary keys in the generated [SqlTable] + * @return The [SqlTable] that represents the table to be mapped to the stream represented by the + * [ObjectType]. + */ +fun ObjectType.toSqlTable(primaryKeys: List>): SqlTable { + val identifierFieldNames = primaryKeys.flatten().toSet() + val sqlTypeConverter = AirbyteTypeToSqlType() + val columns = + this.properties.entries.map { (name, field) -> + val isPrimaryKey = identifierFieldNames.contains(name) + val isNullable = !isPrimaryKey && field.nullable + SqlColumn( + name = name, + type = sqlTypeConverter.convert(field.type), + isPrimaryKey = isPrimaryKey, + isNullable = isNullable + ) + } + return SqlTable(columns = columns) +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValue.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValue.kt new file mode 100644 index 000000000000..4104ae81df5f --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValue.kt @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.convert + +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.ArrayValue +import io.airbyte.cdk.load.data.BooleanValue +import io.airbyte.cdk.load.data.DateValue +import io.airbyte.cdk.load.data.IntegerValue +import io.airbyte.cdk.load.data.NullValue +import io.airbyte.cdk.load.data.NumberValue +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimeWithTimezoneValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue +import io.airbyte.cdk.load.data.UnknownValue +import io.airbyte.cdk.load.util.serializeToJsonBytes +import io.airbyte.integrations.destination.mssql.v2.model.SqlTable +import io.airbyte.integrations.destination.mssql.v2.model.SqlTableRow +import io.airbyte.integrations.destination.mssql.v2.model.SqlTableRowValue +import java.sql.Date +import java.sql.Time +import java.sql.Timestamp + +/** CDK pipeline [AirbyteValue] to SQL values converter. */ +class AirbyteValueToSqlValue { + + /** + * Converts an [AirbyteValue] to the associated SQL value. + * + * @param airbyteValue The [AirbyteValue] from an Airbyte record + * @return The corresponding SQL value for the given [AirbyteValue]. + * @throws IllegalArgumentException if the [AirbyteValue] is not supported. + */ + fun convert(airbyteValue: AirbyteValue): Any? { + return when (airbyteValue) { + is ObjectValue -> { + val convertedValues = + airbyteValue.values.entries.associate { (name, value) -> + name to convert(value) + } + convertedValues + } + is ArrayValue -> airbyteValue.values.map { convert(it) } + is BooleanValue -> airbyteValue.value + is DateValue -> Date.valueOf(airbyteValue.value) + is IntegerValue -> airbyteValue.value + is NullValue -> null + is NumberValue -> airbyteValue.value.toDouble().toBigDecimal() + is StringValue -> airbyteValue.value + is UnknownValue -> airbyteValue.value.serializeToJsonBytes() + is TimeWithTimezoneValue -> Time.valueOf(airbyteValue.value.toLocalTime()) + is TimeWithoutTimezoneValue -> Time.valueOf(airbyteValue.value) + is TimestampWithTimezoneValue -> Timestamp.valueOf(airbyteValue.value.toLocalDateTime()) + is TimestampWithoutTimezoneValue -> Timestamp.valueOf(airbyteValue.value) + } + } +} + +/** + * Extension function that converts an [ObjectValue] into a row of SQL values. + * + * @param sqlTable The [SqlTable] that contains data type information for each column. This is used + * to filter the [ObjectValue]'s values to only those that exist in the table. + * @return A [SqlTableRow] that contains values converted to their SQL data type equivalents from + * the provided [ObjectValue]. + */ +fun ObjectValue.toSqlValue(sqlTable: SqlTable): SqlTableRow { + val converter = AirbyteValueToSqlValue() + return SqlTableRow( + values = + this.values + .filter { (name, _) -> sqlTable.columns.find { it.name == name } != null } + .map { (name, value) -> + val dataType = sqlTable.columns.find { it.name == name }!!.type + val converted = + when (value) { + is ObjectValue -> + (converter.convert(value) as LinkedHashMap<*, *>) + .serializeToJsonBytes() + is ArrayValue -> + (converter.convert(value) as List<*>).serializeToJsonBytes() + else -> converter.convert(value) + } + SqlTableRowValue(name = name, value = converted, type = dataType) + } + ) +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToStatement.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToStatement.kt new file mode 100644 index 000000000000..30a0ddc57ac7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToStatement.kt @@ -0,0 +1,223 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.convert + +import io.airbyte.cdk.load.data.AirbyteType +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper +import io.airbyte.cdk.load.data.ArrayValue +import io.airbyte.cdk.load.data.BooleanValue +import io.airbyte.cdk.load.data.DateValue +import io.airbyte.cdk.load.data.FieldType +import io.airbyte.cdk.load.data.IntegerValue +import io.airbyte.cdk.load.data.NullValue +import io.airbyte.cdk.load.data.NumberValue +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimeWithTimezoneValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue +import io.airbyte.cdk.load.data.UnionType +import io.airbyte.cdk.load.data.UnknownValue +import io.airbyte.integrations.destination.mssql.v2.MSSQLQueryBuilder +import io.airbyte.protocol.models.Jsons +import java.sql.Date +import java.sql.PreparedStatement +import java.sql.Time +import java.sql.Timestamp +import java.sql.Types + +class AirbyteValueToStatement { + companion object { + private val toSqlType = AirbyteTypeToSqlType() + private val toSqlValue = AirbyteValueToSqlValue() + private val valueCoercingMapper = + AirbyteValueDeepCoercingMapper( + recurseIntoObjects = false, + recurseIntoArrays = false, + recurseIntoUnions = false, + ) + + fun PreparedStatement.setValue( + idx: Int, + value: AirbyteValue?, + field: MSSQLQueryBuilder.NamedField + ) { + if (value != null && value !is NullValue && field.type.type is UnionType) { + val objectValue = createUnionObject(field.type.type as UnionType, value) + setAsJsonString(idx, objectValue) + } else { + when (value) { + is ArrayValue -> setAsJsonString(idx, value) + is BooleanValue -> setAsBooleanValue(idx, value) + is DateValue -> setAsDateValue(idx, value) + is IntegerValue -> setAsIntegerValue(idx, value) + NullValue -> setAsNullValue(idx, field.type.type) + is NumberValue -> setAsNumberValue(idx, value) + is ObjectValue -> setAsJsonString(idx, value) + is StringValue -> setAsStringValue(idx, value, field.type.type) + is TimeWithTimezoneValue -> setAsTime(idx, value) + is TimeWithoutTimezoneValue -> setAsTime(idx, value) + is TimestampWithTimezoneValue -> setAsTimestamp(idx, value) + is TimestampWithoutTimezoneValue -> setAsTimestamp(idx, value) + is UnknownValue -> setAsJsonString(idx, value) + null -> setAsNullValue(idx, field.type.type) + } + } + } + + fun PreparedStatement.setAsNullValue(idx: Int, type: AirbyteType) { + val sqlType = toSqlType.convert(type) + setNull(idx, sqlType) + } + + private fun PreparedStatement.setAsBooleanValue(idx: Int, value: BooleanValue) { + setBoolean(idx, value.value) + } + + private fun PreparedStatement.setAsDateValue(idx: Int, value: DateValue) { + setDate(idx, Date.valueOf(value.value)) + } + + private fun PreparedStatement.setAsIntegerValue(idx: Int, value: IntegerValue) { + setLong(idx, value.value.longValueExact()) + } + + private fun PreparedStatement.setAsJsonString(idx: Int, value: AirbyteValue) { + setString(idx, Jsons.serialize(toSqlValue.convert(value))) + } + + private fun PreparedStatement.setAsNumberValue(idx: Int, value: NumberValue) { + setDouble(idx, value.value.toDouble()) + } + + private fun PreparedStatement.setAsStringValue( + idx: Int, + value: StringValue, + type: AirbyteType + ) { + val sqlType = toSqlType.convert(type) + if (sqlType == Types.VARCHAR || sqlType == Types.LONGVARCHAR) { + setString(idx, value.value) + } else { + // TODO: this is a fallback because Values aren't fully typed. + // TODO: this should get refactored once the CDK interface changed wrt types and + // values + if ( + sqlType in + setOf( + Types.DATE, + Types.TIME, + Types.TIME_WITH_TIMEZONE, + Types.TIMESTAMP, + Types.TIMESTAMP_WITH_TIMEZONE + ) + ) { + val coercedValue = valueCoercingMapper.map(value, type) + if (coercedValue.second.isEmpty()) { + when (coercedValue.first) { + is DateValue -> setAsDateValue(idx, coercedValue.first as DateValue) + is TimeWithTimezoneValue -> + setAsTime(idx, coercedValue.first as TimeWithTimezoneValue) + is TimeWithoutTimezoneValue -> + setAsTime(idx, coercedValue.first as TimeWithoutTimezoneValue) + is TimestampWithTimezoneValue -> + setAsTimestamp( + idx, + coercedValue.first as TimestampWithTimezoneValue + ) + is TimestampWithoutTimezoneValue -> + setAsTimestamp( + idx, + coercedValue.first as TimestampWithoutTimezoneValue + ) + else -> throw IllegalArgumentException("$value isn't a $type") + } + } else { + throw IllegalArgumentException("$value isn't a $type") + } + } else { + throw IllegalArgumentException("$value isn't a $type") + } + } + } + + private fun PreparedStatement.setAsTime(idx: Int, value: TimeWithTimezoneValue) { + setTime(idx, Time.valueOf(value.value.toLocalTime())) + } + + private fun PreparedStatement.setAsTime(idx: Int, value: TimeWithoutTimezoneValue) { + setTime(idx, Time.valueOf(value.value)) + } + + private fun PreparedStatement.setAsTimestamp(idx: Int, value: TimestampWithTimezoneValue) { + setTimestamp(idx, Timestamp.valueOf(value.value.toLocalDateTime())) + } + + private fun PreparedStatement.setAsTimestamp( + idx: Int, + value: TimestampWithoutTimezoneValue + ) { + setTimestamp(idx, Timestamp.valueOf(value.value)) + } + + private fun createSimpleUnionObject(value: AirbyteValue): ObjectValue { + val unionType = value.toTypeName() + return ObjectValue.from(mapOf("type" to StringValue(unionType), unionType to value)) + } + + private fun createUnionObject(type: UnionType, value: AirbyteValue): AirbyteValue = + if (type.options.all { it is ObjectType }) { + val model = + mutableMapOf>().withDefault { mutableSetOf() } + + type.options.map { + (it as ObjectType).properties.entries.forEach { objectEntry -> + if (model.containsKey(objectEntry.key)) { + model[objectEntry.key]!! += objectEntry.value + } else { + model[objectEntry.key] = mutableSetOf(objectEntry.value) + } + } + } + if (model.values.all { it.size == 1 }) { + value + } else { + val valuesWithConflicts = + (value as ObjectValue) + .values + .entries + .map { pair -> + if (model[pair.key]?.let { it.size > 1 } == true) + Pair(pair.key, createSimpleUnionObject(pair.value)) + else Pair(pair.key, pair.value) + } + .toMap() + ObjectValue.from(valuesWithConflicts) + } + } else { + createSimpleUnionObject(value) + } + + private fun AirbyteValue.toTypeName(): String = + when (this) { + is ArrayValue -> "array" + is BooleanValue -> "boolean" + is DateValue -> "string" + is IntegerValue -> "integer" + NullValue -> "null" + is NumberValue -> "number" + is ObjectValue -> "object" + is StringValue -> "string" + is TimeWithTimezoneValue -> "string" + is TimeWithoutTimezoneValue -> "string" + is TimestampWithTimezoneValue -> "string" + is TimestampWithoutTimezoneValue -> "string" + is UnknownValue -> "oneOf" + } + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/ResultSetToAirbyteValue.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/ResultSetToAirbyteValue.kt new file mode 100644 index 000000000000..7d60598f7373 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/ResultSetToAirbyteValue.kt @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.convert + +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.ArrayType +import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema +import io.airbyte.cdk.load.data.ArrayValue +import io.airbyte.cdk.load.data.BooleanType +import io.airbyte.cdk.load.data.BooleanValue +import io.airbyte.cdk.load.data.DateType +import io.airbyte.cdk.load.data.DateValue +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.IntegerValue +import io.airbyte.cdk.load.data.NullValue +import io.airbyte.cdk.load.data.NumberType +import io.airbyte.cdk.load.data.NumberValue +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimeTypeWithTimezone +import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone +import io.airbyte.cdk.load.data.TimeWithTimezoneValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue +import io.airbyte.cdk.load.data.TimestampTypeWithTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue +import io.airbyte.cdk.load.data.UnionType +import io.airbyte.cdk.load.data.UnknownType +import io.airbyte.integrations.destination.mssql.v2.MSSQLQueryBuilder +import io.airbyte.integrations.destination.mssql.v2.MSSQLQueryBuilder.NamedValue +import io.airbyte.protocol.models.Jsons +import java.sql.ResultSet +import java.sql.Timestamp +import java.time.LocalTime +import java.time.OffsetDateTime +import java.time.format.DateTimeFormatter + +class ResultSetToAirbyteValue { + companion object { + fun ResultSet.getAirbyteNamedValue(field: MSSQLQueryBuilder.NamedField): NamedValue = + when (field.type.type) { + is StringType -> getStringValue(field.name) + is ArrayType -> getArrayValue(field.name) + ArrayTypeWithoutSchema -> getArrayValue(field.name) + BooleanType -> getBooleanValue(field.name) + DateType -> getDateValue(field.name) + IntegerType -> getIntegerValue(field.name) + NumberType -> getNumberValue(field.name) + is ObjectType -> getObjectValue(field.name) + ObjectTypeWithEmptySchema -> getObjectValue(field.name) + ObjectTypeWithoutSchema -> getObjectValue(field.name) + TimeTypeWithTimezone -> getTimeWithTimezoneValue(field.name) + TimeTypeWithoutTimezone -> getTimeWithoutTimezoneValue(field.name) + TimestampTypeWithTimezone -> getTimestampWithTimezoneValue(field.name) + TimestampTypeWithoutTimezone -> getTimestampWithoutTimezoneValue(field.name) + is UnionType -> getObjectValue(field.name) + is UnknownType -> getStringValue(field.name) + } + + private fun ResultSet.getArrayValue(name: String): NamedValue = + getNullable(name, this::getString) + ?.let { ArrayValue.from(deserialize>(it)) } + .toNamedValue(name) + + private fun ResultSet.getBooleanValue(name: String): NamedValue = + getNullable(name, this::getBoolean)?.let { BooleanValue(it) }.toNamedValue(name) + + private fun ResultSet.getDateValue(name: String): NamedValue = + getNullable(name, this::getDate)?.let { DateValue(it.toString()) }.toNamedValue(name) + + private fun ResultSet.getIntegerValue(name: String): NamedValue = + getNullable(name, this::getLong)?.let { IntegerValue(it) }.toNamedValue(name) + + private fun ResultSet.getNumberValue(name: String): NamedValue = + getNullable(name, this::getDouble) + ?.let { NumberValue(it.toBigDecimal()) } + .toNamedValue(name) + + private fun ResultSet.getObjectValue(name: String): NamedValue = + getNullable(name, this::getString) + ?.let { ObjectValue.from(deserialize>(it)) } + .toNamedValue(name) + + private fun ResultSet.getStringValue(name: String): NamedValue = + getNullable(name, this::getString)?.let { StringValue(it) }.toNamedValue(name) + + private fun ResultSet.getTimeWithTimezoneValue(name: String): NamedValue = + getNullable(name, this::getString)?.toTimeWithTimezone().toNamedValue(name) + + private fun ResultSet.getTimeWithoutTimezoneValue(name: String): NamedValue = + getNullable(name, this::getString)?.toTimeWithoutTimezone().toNamedValue(name) + + private fun ResultSet.getTimestampWithTimezoneValue(name: String): NamedValue = + getNullable(name, this::getString)?.toTimestampWithTimezone().toNamedValue(name) + + private fun ResultSet.getTimestampWithoutTimezoneValue(name: String): NamedValue = + getNullable(name, this::getString)?.toTimestampWithoutTimezone().toNamedValue(name) + + private fun AirbyteValue?.toNamedValue(name: String): NamedValue = + if (this != null) NamedValue(name, this) else NamedValue(name, NullValue) + + private fun ResultSet.getNullable(name: String, getter: (String) -> T): T? { + val value = getter(name) + return if (wasNull()) null else value + } + + private inline fun deserialize(value: String): T = + Jsons.deserialize(value, T::class.java) + + internal fun String.toTimeWithTimezone(): TimeWithTimezoneValue = + TimeWithTimezoneValue( + OffsetDateTime.parse( + this, + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSS XXX") + ) + .toOffsetTime() + .toString() + ) + + internal fun String.toTimeWithoutTimezone(): TimeWithoutTimezoneValue = + TimeWithoutTimezoneValue(LocalTime.parse(this).toString()) + + internal fun String.toTimestampWithTimezone(): TimestampWithTimezoneValue = + TimestampWithTimezoneValue( + OffsetDateTime.parse( + this, + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSS XXX") + ) + .toString() + ) + + internal fun String.toTimestampWithoutTimezone(): TimestampWithoutTimezoneValue = + TimestampWithoutTimezoneValue(Timestamp.valueOf(this).toLocalDateTime().toString()) + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/SqlTypeToMssqlType.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/SqlTypeToMssqlType.kt new file mode 100644 index 000000000000..a85975af9bc9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/SqlTypeToMssqlType.kt @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.convert + +import java.sql.Types + +enum class MssqlType(val sqlType: Int, val sqlStringOverride: String? = null) { + TEXT(Types.LONGVARCHAR), + BIT(Types.BOOLEAN), + DATE(Types.DATE), + BIGINT(Types.BIGINT), + DECIMAL(Types.DECIMAL, sqlStringOverride = "DECIMAL(18, 8)"), + VARCHAR(Types.VARCHAR, sqlStringOverride = "VARCHAR(MAX)"), + DATETIMEOFFSET(Types.TIMESTAMP_WITH_TIMEZONE), + TIME(Types.TIME), + DATETIME(Types.TIMESTAMP); + + val sqlString: String = sqlStringOverride ?: name + + companion object { + val fromSqlType: Map = + entries + .associateByTo(mutableMapOf()) { it.sqlType } + .apply { this[Types.TIME_WITH_TIMEZONE] = DATETIMEOFFSET } + .toMap() + } +} + +class SqlTypeToMssqlType { + fun convert(type: Int): MssqlType = + MssqlType.fromSqlType.get(type) ?: throw IllegalArgumentException("type $type not found") +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/model/SqlModels.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/model/SqlModels.kt new file mode 100644 index 000000000000..59f4457b61e1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/model/SqlModels.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.model + +import java.sql.Types + +/** + * Representation of a colum in a SQL table. + * + * @param name The name of the column + * @param type The data type of the column (see [Types] for values). + * @param isPrimaryKey Whether the column represents a primary key. + * @param isNullable Whether the column's value supports null values. + */ +data class SqlColumn( + val name: String, + val type: Int, + val isPrimaryKey: Boolean = false, + val isNullable: Boolean = false +) + +/** + * Representation of a SQL table. + * + * @param columns The list of columns in the table. + */ +data class SqlTable(val columns: List) + +/** + * Representation of a value in a SQL row/column cell. + * + * @param name The name of the column. + * @param value The value of the row/column cell. + * @param type The SQL type of the row/column cell (see [Types] for values). + */ +data class SqlTableRowValue(val name: String, val value: Any?, val type: Int) + +/** + * Representation of a row of values in a SQL table. + * + * @param values A list of values stored in the row. + */ +data class SqlTableRow(val values: List) diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt new file mode 100644 index 000000000000..2d4419ec8ead --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLCheckTest.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2 + +import io.airbyte.cdk.load.check.CheckIntegrationTest +import io.airbyte.cdk.load.check.CheckTestConfig +import io.airbyte.integrations.destination.mssql.v2.config.MSSQLSpecification +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll + +internal class MSSQLCheckTest : + CheckIntegrationTest( + successConfigFilenames = + listOf( + CheckTestConfig(MSSQLTestConfigUtil.getConfigPath("check/valid.json")), + CheckTestConfig(MSSQLTestConfigUtil.getConfigPath("check/valid-ssl-trust.json")), + ), + failConfigFilenamesAndFailureReasons = + mapOf( + CheckTestConfig( + MSSQLTestConfigUtil.getConfigPath("check/fail-internal-schema-invalid.json") + ) to "\"iamnotthere\" either does not exist".toPattern(), + ), + configUpdater = MSSQLConfigUpdater() + ) { + + companion object { + @JvmStatic + @BeforeAll + fun beforeAll() { + MSSQLContainerHelper.start() + } + + @JvmStatic + @AfterAll + fun afterAll() { + MSSQLContainerHelper.stop() + } + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLContainerHelper.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLContainerHelper.kt new file mode 100644 index 000000000000..b37c96f071fa --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLContainerHelper.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2025 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2 + +import io.airbyte.cdk.load.test.util.ConfigurationUpdater +import io.airbyte.integrations.destination.mssql.v2.MSSQLContainerHelper.getIpAddress +import io.airbyte.integrations.destination.mssql.v2.MSSQLContainerHelper.getPort +import io.github.oshai.kotlinlogging.KotlinLogging +import org.testcontainers.containers.MSSQLServerContainer +import org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT + +val logger = KotlinLogging.logger {} + +/** + * Helper class for launching/stopping MSSQL Server test containers, as well as updating destination + * configuration to match test container configuration. + */ +object MSSQLContainerHelper { + + private val testContainer = + MSSQLServerContainer("mcr.microsoft.com/mssql/server:2022-latest") + .acceptLicense() + .withLogConsumer({ e -> logger.debug { e.utf8String } }) + + fun start() { + if (!testContainer.isRunning()) { + testContainer.start() + } + } + fun stop() { + if (testContainer.isRunning()) { + testContainer.stop() + testContainer.close() + } + } + + fun getHost(): String = testContainer.host + + fun getPassword(): String = testContainer.password + + fun getPort(): Int? = testContainer.getMappedPort(MS_SQL_SERVER_PORT) + + fun getIpAddress(): String? { + // Ensure that the container is started first + start() + return testContainer.containerInfo.networkSettings.networks.entries.first().value.ipAddress + } +} + +class MSSQLConfigUpdater : ConfigurationUpdater { + override fun update(config: String): String { + var updatedConfig = config + + // If not running the connector in docker, we must use the mapped port to connect to the + // database. Otherwise, get the container's IP address for the host + updatedConfig = + if (System.getenv("AIRBYTE_CONNECTOR_INTEGRATION_TEST_RUNNER") != "docker") { + getPort()?.let { updatedConfig.replace("$MS_SQL_SERVER_PORT", it.toString()) } + ?: updatedConfig + } else { + getIpAddress()?.let { config.replace("localhost", it) } ?: updatedConfig + } + + updatedConfig = updatedConfig.replace("replace_me", MSSQLContainerHelper.getPassword()) + logger.debug { "Using updated MSSQL configuration: $updatedConfig" } + return updatedConfig + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLSpecTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLSpecTest.kt new file mode 100644 index 000000000000..3601e6a818ec --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLSpecTest.kt @@ -0,0 +1,9 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2 + +import io.airbyte.cdk.load.spec.SpecTest + +class MSSQLSpecTest : SpecTest() {} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLTestConfigUtil.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLTestConfigUtil.kt new file mode 100644 index 000000000000..59bc20e8e1d8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLTestConfigUtil.kt @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2 + +import java.nio.file.Path + +object MSSQLTestConfigUtil { + fun getConfigPath(relativePath: String): Path = + Path.of( + this::class.java.classLoader.getResource(relativePath)?.toURI() + ?: throw IllegalArgumentException("Resource $relativePath could not be found") + ) +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt new file mode 100644 index 000000000000..a658e9e6e496 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriterTest.kt @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2 + +import io.airbyte.cdk.command.ConfigurationSpecification +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.message.Meta +import io.airbyte.cdk.load.test.util.DestinationCleaner +import io.airbyte.cdk.load.test.util.DestinationDataDumper +import io.airbyte.cdk.load.test.util.OutputRecord +import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest +import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior +import io.airbyte.cdk.load.write.StronglyTyped +import io.airbyte.cdk.load.write.UnionBehavior +import io.airbyte.integrations.destination.mssql.v2.config.DataSourceFactory +import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration +import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfigurationFactory +import io.airbyte.integrations.destination.mssql.v2.config.MSSQLSpecification +import io.airbyte.protocol.models.Jsons +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta +import java.nio.file.Files +import java.time.Instant +import java.util.UUID +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Disabled + +abstract class MSSQLWriterTest( + configPath: String, + dataDumper: DestinationDataDumper, + dataCleaner: DestinationCleaner, +) : + BasicFunctionalityIntegrationTest( + configContents = Files.readString(MSSQLTestConfigUtil.getConfigPath(configPath)), + configSpecClass = MSSQLSpecification::class.java, + dataDumper = dataDumper, + destinationCleaner = dataCleaner, + isStreamSchemaRetroactive = true, + supportsDedup = true, + stringifySchemalessObjects = false, + preserveUndeclaredFields = false, + commitDataIncrementally = true, + allTypesBehavior = StronglyTyped(integerCanBeLarge = false), + nullEqualsUnset = true, + supportFileTransfer = false, + envVars = emptyMap(), + configUpdater = MSSQLConfigUpdater(), + schematizedArrayBehavior = SchematizedNestedValueBehavior.STRONGLY_TYPE, + schematizedObjectBehavior = SchematizedNestedValueBehavior.PASS_THROUGH, + unionBehavior = UnionBehavior.PROMOTE_TO_OBJECT, + nullUnknownTypes = false, + ) + +class MSSQLDataDumper : DestinationDataDumper { + override fun dumpRecords( + spec: ConfigurationSpecification, + stream: DestinationStream + ): List { + val config = getConfiguration(spec = spec as MSSQLSpecification, stream = stream) + val sqlBuilder = MSSQLQueryBuilder(config, stream) + val dataSource = DataSourceFactory().dataSource(config) + val output = mutableListOf() + dataSource.connection.use { connection -> + SELECT_FROM.toQuery(sqlBuilder.fqTableName).executeQuery(connection) { rs -> + while (rs.next()) { + val objectValue = sqlBuilder.readResult(rs, sqlBuilder.finalTableSchema) + val record = + OutputRecord( + rawId = + rs.getString(MSSQLQueryBuilder.AIRBYTE_RAW_ID)?.let { + UUID.fromString(it) + }, + extractedAt = + Instant.ofEpochMilli( + rs.getLong(MSSQLQueryBuilder.AIRBYTE_EXTRACTED_AT) + ), + loadedAt = null, + generationId = rs.getLong(MSSQLQueryBuilder.AIRBYTE_GENERATION_ID), + data = objectValue, + airbyteMeta = + rs.getString(MSSQLQueryBuilder.AIRBYTE_META)?.let { + val meta = + Jsons.deserialize(it, AirbyteRecordMessageMeta::class.java) + OutputRecord.Meta( + changes = + meta.changes + .map { change -> + Meta.Change( + field = change.field, + change = change.change, + reason = change.reason, + ) + } + .toList(), + syncId = + meta.additionalProperties["syncId"] + ?.toString() + ?.toLong() + ) + }, + ) + output.add(record) + } + } + } + return output + } + + override fun dumpFile( + spec: ConfigurationSpecification, + stream: DestinationStream + ): List { + return emptyList() + } + + private fun getConfiguration( + spec: ConfigurationSpecification, + stream: DestinationStream + ): MSSQLConfiguration { + /* + * Replace the host, port and schema to match what is exposed + * by the container and generated by the test suite in the case of the schema name + */ + val configOverrides = + mutableMapOf("host" to MSSQLContainerHelper.getHost()).apply { + MSSQLContainerHelper.getPort()?.let { port -> put("port", port.toString()) } + stream.descriptor.namespace?.let { schema -> put("schema", schema) } + } + return MSSQLConfigurationFactory() + .makeWithOverrides(spec = spec as MSSQLSpecification, overrides = configOverrides) + } +} + +class MSSQLDataCleaner : DestinationCleaner { + override fun cleanup() { + // TODO("Not yet implemented") + } +} + +internal class StandardInsert : + MSSQLWriterTest( + "check/valid.json", + MSSQLDataDumper(), + MSSQLDataCleaner(), + ) { + + companion object { + @JvmStatic + @BeforeAll + fun beforeAll() { + MSSQLContainerHelper.start() + } + + @JvmStatic + @AfterAll + fun afterAll() { + MSSQLContainerHelper.stop() + } + } + + @Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11440") + override fun testInterruptedTruncateWithoutPriorData() { + super.testInterruptedTruncateWithoutPriorData() + } + + @Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11440") + override fun testInterruptedTruncateWithPriorData() { + super.testInterruptedTruncateWithPriorData() + } + + @Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11440") + override fun resumeAfterCancelledTruncate() { + super.resumeAfterCancelledTruncate() + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/check/fail-internal-schema-invalid.json b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/check/fail-internal-schema-invalid.json new file mode 100644 index 000000000000..f9ae78fd6890 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/check/fail-internal-schema-invalid.json @@ -0,0 +1,10 @@ +{ + "host": "localhost", + "port": 1433, + "database": "master", + "schema": "dbo", + "raw_data_schema": "iamnotthere", + "ssl_method": { "name": "unencrypted" }, + "user": "sa", + "password": "replace_me" +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/check/valid-ssl-trust.json b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/check/valid-ssl-trust.json new file mode 100644 index 000000000000..385ecab55ae2 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/check/valid-ssl-trust.json @@ -0,0 +1,10 @@ +{ + "host": "localhost", + "port": 1433, + "database": "master", + "schema": "dbo", + "raw_data_schema": "guest", + "ssl_method": { "name": "encrypted_trust_server_certificate" }, + "user": "sa", + "password": "replace_me" +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/check/valid.json b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/check/valid.json new file mode 100644 index 000000000000..b91c5d8e44b4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/check/valid.json @@ -0,0 +1,10 @@ +{ + "host": "localhost", + "port": 1433, + "database": "master", + "schema": "dbo", + "raw_data_schema": "guest", + "ssl_method": { "name": "unencrypted" }, + "user": "sa", + "password": "replace_me" +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/expected-spec-cloud.json b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/expected-spec-cloud.json new file mode 100644 index 000000000000..5cee2a681685 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/expected-spec-cloud.json @@ -0,0 +1,136 @@ +{ + "documentationUrl" : "https://docs.airbyte.com/integrations/destinations/mssql-v2", + "connectionSpecification" : { + "$schema" : "http://json-schema.org/draft-07/schema#", + "title" : "MSSQL V2 Destination Specification", + "type" : "object", + "additionalProperties" : true, + "properties" : { + "host" : { + "type" : "string", + "description" : "The host name of the MSSQL database.", + "title" : "Host", + "order" : 0 + }, + "port" : { + "type" : "integer", + "description" : "The port of the MSSQL database.", + "title" : "Port", + "minimum" : 0, + "maximum" : 65536, + "examples" : [ "1433" ], + "order" : 1 + }, + "database" : { + "type" : "string", + "description" : "The name of the MSSQL database.", + "title" : "DB Name", + "order" : 2 + }, + "schema" : { + "type" : "string", + "description" : "The default schema tables are written to if the source does not specify a namespace. The usual value for this field is \"public\".", + "title" : "Default Schema", + "examples" : [ "public" ], + "default" : "public", + "order" : 3 + }, + "user" : { + "type" : "string", + "description" : "The username which is used to access the database.", + "title" : "User", + "order" : 4 + }, + "password" : { + "type" : "string", + "description" : "The password associated with this username.", + "title" : "Password", + "airbyte_secret" : true, + "order" : 5 + }, + "jdbc_url_params" : { + "type" : "string", + "description" : "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).", + "title" : "JDBC URL Params", + "order" : 6 + }, + "raw_data_schema" : { + "type" : "string", + "description" : "The schema to write raw tables into (default: airbyte_internal)", + "title" : "Raw Table Schema Name", + "default" : "airbyte_internal", + "order" : 5 + }, + "ssl_method" : { + "oneOf" : [ { + "title" : "Unencrypted", + "type" : "object", + "additionalProperties" : true, + "description" : "The data transfer will not be encrypted.", + "properties" : { + "name" : { + "type" : "string", + "enum" : [ "unencrypted" ], + "default" : "unencrypted" + } + }, + "required" : [ "name" ] + }, { + "title" : "Encrypted (trust server certificate)", + "type" : "object", + "additionalProperties" : true, + "description" : "Use the certificate provided by the server without verification. (For testing purposes only!)", + "properties" : { + "name" : { + "type" : "string", + "enum" : [ "encrypted_trust_server_certificate" ], + "default" : "encrypted_trust_server_certificate" + } + }, + "required" : [ "name" ] + }, { + "title" : "Encrypted (verify certificate)", + "type" : "object", + "additionalProperties" : true, + "description" : "Verify and use the certificate provided by the server.", + "properties" : { + "name" : { + "type" : "string", + "enum" : [ "encrypted_verify_certificate" ], + "default" : "encrypted_verify_certificate" + }, + "trustStoreName" : { + "type" : "string", + "description" : "Specifies the name of the trust store.", + "title" : "Trust Store Name", + "order" : 1 + }, + "trustStorePassword" : { + "type" : "string", + "description" : "Specifies the password of the trust store.", + "title" : "Trust Store Password", + "airbyte_secret" : true, + "order" : 2 + }, + "hostNameInCertificate" : { + "type" : "string", + "description" : "Specifies the host name of the server. The value of this property must match the subject property of the certificate.", + "title" : "Host Name In Certificate", + "order" : 3 + } + }, + "required" : [ "name" ] + } ], + "description" : "The encryption method which is used to communicate with the database.", + "title" : "SSL Method", + "order" : 8, + "type" : "object" + } + }, + "required" : [ "host", "port", "database", "schema", "raw_data_schema", "ssl_method" ] + }, + "supportsIncremental" : true, + "supportsNormalization" : false, + "supportsDBT" : false, + "supported_destination_sync_modes" : [ "overwrite", "append", "append_dedup" ] +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/expected-spec-oss.json b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/expected-spec-oss.json new file mode 100644 index 000000000000..5cee2a681685 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/resources/expected-spec-oss.json @@ -0,0 +1,136 @@ +{ + "documentationUrl" : "https://docs.airbyte.com/integrations/destinations/mssql-v2", + "connectionSpecification" : { + "$schema" : "http://json-schema.org/draft-07/schema#", + "title" : "MSSQL V2 Destination Specification", + "type" : "object", + "additionalProperties" : true, + "properties" : { + "host" : { + "type" : "string", + "description" : "The host name of the MSSQL database.", + "title" : "Host", + "order" : 0 + }, + "port" : { + "type" : "integer", + "description" : "The port of the MSSQL database.", + "title" : "Port", + "minimum" : 0, + "maximum" : 65536, + "examples" : [ "1433" ], + "order" : 1 + }, + "database" : { + "type" : "string", + "description" : "The name of the MSSQL database.", + "title" : "DB Name", + "order" : 2 + }, + "schema" : { + "type" : "string", + "description" : "The default schema tables are written to if the source does not specify a namespace. The usual value for this field is \"public\".", + "title" : "Default Schema", + "examples" : [ "public" ], + "default" : "public", + "order" : 3 + }, + "user" : { + "type" : "string", + "description" : "The username which is used to access the database.", + "title" : "User", + "order" : 4 + }, + "password" : { + "type" : "string", + "description" : "The password associated with this username.", + "title" : "Password", + "airbyte_secret" : true, + "order" : 5 + }, + "jdbc_url_params" : { + "type" : "string", + "description" : "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).", + "title" : "JDBC URL Params", + "order" : 6 + }, + "raw_data_schema" : { + "type" : "string", + "description" : "The schema to write raw tables into (default: airbyte_internal)", + "title" : "Raw Table Schema Name", + "default" : "airbyte_internal", + "order" : 5 + }, + "ssl_method" : { + "oneOf" : [ { + "title" : "Unencrypted", + "type" : "object", + "additionalProperties" : true, + "description" : "The data transfer will not be encrypted.", + "properties" : { + "name" : { + "type" : "string", + "enum" : [ "unencrypted" ], + "default" : "unencrypted" + } + }, + "required" : [ "name" ] + }, { + "title" : "Encrypted (trust server certificate)", + "type" : "object", + "additionalProperties" : true, + "description" : "Use the certificate provided by the server without verification. (For testing purposes only!)", + "properties" : { + "name" : { + "type" : "string", + "enum" : [ "encrypted_trust_server_certificate" ], + "default" : "encrypted_trust_server_certificate" + } + }, + "required" : [ "name" ] + }, { + "title" : "Encrypted (verify certificate)", + "type" : "object", + "additionalProperties" : true, + "description" : "Verify and use the certificate provided by the server.", + "properties" : { + "name" : { + "type" : "string", + "enum" : [ "encrypted_verify_certificate" ], + "default" : "encrypted_verify_certificate" + }, + "trustStoreName" : { + "type" : "string", + "description" : "Specifies the name of the trust store.", + "title" : "Trust Store Name", + "order" : 1 + }, + "trustStorePassword" : { + "type" : "string", + "description" : "Specifies the password of the trust store.", + "title" : "Trust Store Password", + "airbyte_secret" : true, + "order" : 2 + }, + "hostNameInCertificate" : { + "type" : "string", + "description" : "Specifies the host name of the server. The value of this property must match the subject property of the certificate.", + "title" : "Host Name In Certificate", + "order" : 3 + } + }, + "required" : [ "name" ] + } ], + "description" : "The encryption method which is used to communicate with the database.", + "title" : "SSL Method", + "order" : 8, + "type" : "object" + } + }, + "required" : [ "host", "port", "database", "schema", "raw_data_schema", "ssl_method" ] + }, + "supportsIncremental" : true, + "supportsNormalization" : false, + "supportsDBT" : false, + "supported_destination_sync_modes" : [ "overwrite", "append", "append_dedup" ] +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/ResultSetToAirbyteValueTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/ResultSetToAirbyteValueTest.kt new file mode 100644 index 000000000000..e83331099ae7 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/ResultSetToAirbyteValueTest.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2 + +import io.airbyte.cdk.load.data.TimeWithTimezoneValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue +import io.airbyte.integrations.destination.mssql.v2.convert.ResultSetToAirbyteValue.Companion.toTimeWithTimezone +import io.airbyte.integrations.destination.mssql.v2.convert.ResultSetToAirbyteValue.Companion.toTimeWithoutTimezone +import io.airbyte.integrations.destination.mssql.v2.convert.ResultSetToAirbyteValue.Companion.toTimestampWithTimezone +import io.airbyte.integrations.destination.mssql.v2.convert.ResultSetToAirbyteValue.Companion.toTimestampWithoutTimezone +import kotlin.test.assertEquals +import org.junit.jupiter.api.Test + +class ResultSetToAirbyteValueTest { + @Test + fun `test TimeWithTimezone read`() { + val actual = "1970-01-01 12:34:56.0000000 +00:00".toTimeWithTimezone() + val expected = TimeWithTimezoneValue("12:34:56Z") + assertEquals(expected, actual) + } + + @Test + fun `test TimeWithoutTimezone read`() { + val actual = "12:34:56.0000000".toTimeWithoutTimezone() + val expected = TimeWithoutTimezoneValue("12:34:56") + assertEquals(expected, actual) + } + + @Test + fun `test TimestampWithTimezone read`() { + val actual = "2023-01-23 12:34:56.0000000 +00:00".toTimestampWithTimezone() + val expected = TimestampWithTimezoneValue("2023-01-23T12:34:56Z") + assertEquals(expected, actual) + } + + @Test + fun `test TimestampWithoutTimezone read`() { + val actual = "2023-01-23 12:34:56.0".toTimestampWithoutTimezone() + val expected = TimestampWithoutTimezoneValue("2023-01-23T12:34:56") + assertEquals(expected, actual) + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/config/DataSourceFactoryTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/config/DataSourceFactoryTest.kt new file mode 100644 index 000000000000..8dbd1d1f28ce --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/config/DataSourceFactoryTest.kt @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.config + +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue +import org.junit.jupiter.api.Test + +internal class DataSourceFactoryTest { + + @Test + fun `test data source base url conversion`() { + val config = + Fixtures.defaultConfig.copy( + host = "myhost", + port = 1234, + database = "db", + ) + val dataSource = config.toSQLServerDataSource() + assertTrue { dataSource.url.startsWith("jdbc:sqlserver://myhost:1234;databaseName=db;") } + } + + @Test + fun `test data source handles optional passwords conversion`() { + val config = + Fixtures.defaultConfig.copy( + user = "airbyte-test", + password = null, + ) + val dataSource = config.toSQLServerDataSource() + assertEquals("airbyte-test", dataSource.user) + } + + @Test + fun `test jdbc params passthrough`() { + val config = Fixtures.defaultConfig.copy(jdbcUrlParams = "custom=params") + val dataSource = config.toSQLServerDataSource() + assertTrue { dataSource.url.endsWith(";custom=params") } + } + + @Test + fun `test unencrypted config`() { + val config = Fixtures.defaultConfig.copy(sslMethod = Unencrypted()) + val dataSource = config.toSQLServerDataSource() + assertTrue { dataSource.url.contains(";encrypt=false") } + assertFalse { dataSource.url.contains(";encrypt=true") } + } + + @Test + fun `test encrypted trust config`() { + val config = Fixtures.defaultConfig.copy(sslMethod = EncryptedTrust()) + val dataSource = config.toSQLServerDataSource() + assertTrue { dataSource.url.contains(";encrypt=true;trustServerCertificate=true") } + assertFalse { dataSource.url.contains(";encrypt=false") } + } + + @Test + fun `test encrypted verify config`() { + val sslMethod = + EncryptedVerify( + trustStoreName = "name", + trustStorePassword = "password", + hostNameInCertificate = "cert-host" + ) + val config = Fixtures.defaultConfig.copy(sslMethod = sslMethod) + val dataSource = config.toSQLServerDataSource() + assertTrue { dataSource.url.contains(";encrypt=true") } + assertTrue { dataSource.url.contains(";trustStoreName=${sslMethod.trustStoreName}") } + assertTrue { + dataSource.url.contains(";trustStorePassword=${sslMethod.trustStorePassword}") + } + assertTrue { + dataSource.url.contains(";hostNameInCertificate=${sslMethod.hostNameInCertificate}") + } + assertFalse { dataSource.url.contains(";encrypt=false") } + assertFalse { dataSource.url.contains(";trustServerCertificate=true") } + } + + object Fixtures { + val defaultConfig = + MSSQLConfiguration( + host = "localhost", + port = 1433, + database = "master", + schema = "dbo", + user = "airbyte", + password = "super secure o//", + jdbcUrlParams = null, + rawDataSchema = "airbyte_internal", + sslMethod = Unencrypted(), + ) + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlTypeTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlTypeTest.kt new file mode 100644 index 000000000000..8df04c36f664 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlTypeTest.kt @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.convert + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.load.data.ArrayType +import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema +import io.airbyte.cdk.load.data.BooleanType +import io.airbyte.cdk.load.data.DateType +import io.airbyte.cdk.load.data.FieldType +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.NumberType +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.TimeTypeWithTimezone +import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone +import io.airbyte.cdk.load.data.UnionType +import io.airbyte.cdk.load.data.UnknownType +import io.mockk.mockk +import java.sql.Types +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Test + +class AirbyteTypeToSqlTypeTest { + + private val converter = AirbyteTypeToSqlType() + + @Test + fun testConvertObjectType() { + val objectType = + ObjectType( + linkedMapOf( + "id" to FieldType(IntegerType, false), + "name" to FieldType(StringType, true), + ), + ) + val result = converter.convert(objectType) + assertEquals(Types.LONGVARCHAR, result) + } + + @Test + fun testConvertArrayType() { + val arrayType = ArrayType(FieldType(IntegerType, false)) + val result = converter.convert(arrayType) + assertEquals(Types.LONGVARCHAR, result) + } + + @Test + fun testConvertArrayTypeWithoutSchema() { + val arrayType = ArrayTypeWithoutSchema + val result = converter.convert(arrayType) + assertEquals(Types.LONGVARCHAR, result) + } + + @Test + fun testConvertBooleanType() { + val booleanType = BooleanType + val result = converter.convert(booleanType) + assertEquals(Types.BOOLEAN, result) + } + + @Test + fun testConvertDateType() { + val dateType = DateType + val result = converter.convert(dateType) + assertEquals(Types.DATE, result) + } + + @Test + fun testConvertIntegerType() { + val integerType = IntegerType + val result = converter.convert(integerType) + assertEquals(Types.BIGINT, result) + } + + @Test + fun testConvertNumberType() { + val numberType = NumberType + val result = converter.convert(numberType) + assertEquals(Types.DECIMAL, result) + } + + @Test + fun testConvertObjectTypeWithEmptySchema() { + val objectType = ObjectTypeWithEmptySchema + val result = converter.convert(objectType) + assertEquals(Types.LONGVARCHAR, result) + } + + @Test + fun testConvertObjectTypeWithoutSchema() { + val objectType = ObjectTypeWithoutSchema + val result = converter.convert(objectType) + assertEquals(Types.LONGVARCHAR, result) + } + + @Test + fun testConvertStringType() { + val stringType = StringType + val result = converter.convert(stringType) + assertEquals(Types.VARCHAR, result) + } + + @Test + fun testConvertTimeTypeWithTimezone() { + val timeType = TimeTypeWithTimezone + val result = converter.convert(timeType) + assertEquals(Types.TIME_WITH_TIMEZONE, result) + } + + @Test + fun testConvertTimeTypeWithoutTimezone() { + val timeType = TimeTypeWithoutTimezone + val result = converter.convert(timeType) + assertEquals(Types.TIME, result) + } + + @Test + fun testConvertTimestampTypeWithTimezone() { + val timestampType = TimestampTypeWithTimezone + val result = converter.convert(timestampType) + assertEquals(Types.TIMESTAMP_WITH_TIMEZONE, result) + } + + @Test + fun testConvertTimestampTypeWithoutTimezone() { + val timestampType = TimestampTypeWithoutTimezone + val result = converter.convert(timestampType) + assertEquals(Types.TIMESTAMP, result) + } + + @Test + fun testConvertUnionType() { + val unionType = UnionType(setOf(StringType, NumberType)) + val result = converter.convert(unionType) + assertEquals(Types.LONGVARCHAR, result) + } + + @Test + fun testConvertUnknownType() { + val unknownType = UnknownType(mockk()) + val result = converter.convert(unknownType) + assertEquals(Types.LONGVARCHAR, result) + } + + @Test + fun testToSqlTable() { + val primaryKey = "id" + val nullableColumn = "email" + val objectType = + ObjectType( + linkedMapOf( + primaryKey to FieldType(IntegerType, false), + "age" to FieldType(IntegerType, false), + nullableColumn to FieldType(StringType, true), + ), + ) + val primaryKeys = listOf(listOf(primaryKey)) + val table = objectType.toSqlTable(primaryKeys = primaryKeys) + + assertEquals(objectType.properties.size, table.columns.size) + objectType.properties.forEach { (name, type) -> + val column = table.columns.find { it.name == name } + assertNotNull(column) + assertEquals(converter.convert(type.type), column?.type) + assertEquals(primaryKey == name, column?.isPrimaryKey) + assertEquals(nullableColumn == name, column?.isNullable) + } + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValueTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValueTest.kt new file mode 100644 index 000000000000..3b6788131609 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValueTest.kt @@ -0,0 +1,256 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.convert + +import io.airbyte.cdk.load.data.ArrayValue +import io.airbyte.cdk.load.data.DateValue +import io.airbyte.cdk.load.data.IntegerValue +import io.airbyte.cdk.load.data.NullValue +import io.airbyte.cdk.load.data.NumberValue +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue +import io.airbyte.cdk.load.data.UnknownValue +import io.airbyte.cdk.load.util.Jsons +import io.airbyte.cdk.load.util.serializeToJsonBytes +import io.airbyte.integrations.destination.mssql.v2.model.SqlColumn +import io.airbyte.integrations.destination.mssql.v2.model.SqlTable +import java.math.BigDecimal +import java.math.BigInteger +import java.sql.Date +import java.sql.Time +import java.sql.Timestamp +import java.sql.Types +import java.time.ZoneOffset +import org.junit.jupiter.api.Assertions.assertArrayEquals +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Test + +internal class AirbyteValueToSqlValueTest { + + private val converter = AirbyteValueToSqlValue() + + @Test + fun testConvertObjectValue() { + val objectValue = + ObjectValue(linkedMapOf("id" to IntegerValue(42L), "name" to StringValue("John Doe"))) + val result = converter.convert(objectValue) + assertEquals(LinkedHashMap::class.java, result?.javaClass) + assertEquals(mapOf("id" to 42.toBigInteger(), "name" to "John Doe"), result) + } + + @Test + fun testConvertArrayValue() { + val arrayValue = ArrayValue(listOf(StringValue("John Doe"), IntegerValue(42L))) + val result = converter.convert(arrayValue) + assertEquals(ArrayList::class.java, result?.javaClass) + assertEquals(listOf("John Doe", 42.toBigInteger()), result) + } + + @Test + fun testConvertDateValue() { + val dateValue = DateValue("2024-11-18") + val result = converter.convert(dateValue) + assertEquals(Date::class.java, result?.javaClass) + assertEquals( + dateValue.value.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli(), + (result as Date).time + ) + } + + @Test + fun testConvertIntegerValue() { + val intValue = IntegerValue(42) + val result = converter.convert(intValue) + assertEquals(BigInteger::class.java, result?.javaClass) + assertEquals(42.toBigInteger(), result) + } + + @Test + fun testConvertNullValue() { + val nullValue = NullValue + val result = converter.convert(nullValue) + assertNull(result) + } + + @Test + fun testConvertNumberValue() { + val numberValue = NumberValue(42.5.toBigDecimal()) + val result = converter.convert(numberValue) + assertEquals(BigDecimal::class.java, result?.javaClass) + assertEquals(42.5.toBigDecimal(), result) + } + + @Test + fun testConvertStringValue() { + val stringValue = StringValue("test") + val result = converter.convert(stringValue) + assertEquals(String::class.java, result?.javaClass) + assertEquals("test", result) + } + + @Test + fun testConvertTimeValue() { + val timeValue = TimeWithoutTimezoneValue("12:34:56") + val result = converter.convert(timeValue) + assertEquals(Time::class.java, result?.javaClass) + assertEquals(Time.valueOf(timeValue.value).time, (result as Time).time) + } + + @Test + fun testConvertTimestampValue() { + val timestampValue = TimestampWithTimezoneValue("2024-11-18T12:34:56Z") + val result = converter.convert(timestampValue) + assertEquals(Timestamp::class.java, result?.javaClass) + assertEquals( + Timestamp.valueOf(timestampValue.value.toLocalDateTime()).time, + (result as Timestamp).time + ) + } + + @Test + fun testConvertUnknownValue() { + val jsonNode = Jsons.createObjectNode().put("id", "unknownValue") + val unknownValue = UnknownValue(jsonNode) + val result = converter.convert(unknownValue) + assertEquals(ByteArray::class.java, result?.javaClass) + assertArrayEquals(Jsons.writeValueAsBytes(unknownValue.value), result as ByteArray) + } + + @Test + fun testToSqlValue() { + val sqlTable = + SqlTable( + listOf( + SqlColumn( + name = "id", + type = Types.INTEGER, + isPrimaryKey = true, + isNullable = false + ), + SqlColumn( + name = "name", + type = Types.VARCHAR, + isPrimaryKey = false, + isNullable = true + ), + SqlColumn( + name = "meta", + type = Types.BLOB, + isPrimaryKey = false, + isNullable = false + ), + SqlColumn( + name = "items", + type = Types.BLOB, + isPrimaryKey = false, + isNullable = false + ) + ) + ) + val objectValue = + ObjectValue( + linkedMapOf( + "id" to IntegerValue(123L), + "name" to StringValue("John Doe"), + "meta" to + ObjectValue( + linkedMapOf( + "sync_id" to IntegerValue(123L), + "changes" to + ObjectValue( + linkedMapOf( + "change" to StringValue("insert"), + "reason" to StringValue("reason"), + ) + ) + ) + ), + "items" to ArrayValue(listOf(StringValue("item1"), StringValue("item2"))) + ) + ) + + val sqlValue = objectValue.toSqlValue(sqlTable) + + assertEquals(sqlTable.columns.size, sqlValue.values.size) + assertEquals( + BigInteger::class.java, + sqlValue.values.find { it.name == "id" }?.value?.javaClass + ) + assertEquals(123.toBigInteger(), sqlValue.values.find { it.name == "id" }?.value) + assertEquals( + String::class.java, + sqlValue.values.find { it.name == "name" }?.value?.javaClass + ) + assertEquals("John Doe", sqlValue.values.find { it.name == "name" }?.value) + assertEquals( + ByteArray::class.java, + sqlValue.values.find { it.name == "meta" }?.value?.javaClass + ) + assertArrayEquals( + mapOf( + "sync_id" to 123.toBigInteger(), + "changes" to + mapOf( + "change" to "insert", + "reason" to "reason", + ) + ) + .serializeToJsonBytes(), + sqlValue.values.find { it.name == "meta" }?.value as ByteArray + ) + assertEquals( + ByteArray::class.java, + sqlValue.values.find { it.name == "items" }?.value?.javaClass + ) + assertArrayEquals( + listOf("item1", "item2").serializeToJsonBytes(), + sqlValue.values.find { it.name == "items" }?.value as ByteArray + ) + } + + @Test + fun testToSqlValueIgnoresFieldsNotInTable() { + val sqlTable = + SqlTable( + listOf( + SqlColumn( + name = "id", + type = Types.INTEGER, + isPrimaryKey = true, + isNullable = false + ), + ) + ) + val objectValue = + ObjectValue( + linkedMapOf( + "id" to IntegerValue(123L), + "name" to StringValue("Should be ignored"), + ) + ) + + val sqlValue = objectValue.toSqlValue(sqlTable) + assertEquals(sqlTable.columns.size, sqlValue.values.size) + assertEquals( + BigInteger::class.java, + sqlValue.values.find { it.name == "id" }?.value?.javaClass + ) + assertEquals(123.toBigInteger(), sqlValue.values.find { it.name == "id" }?.value) + } + + @Test + fun testObjectMapToJsonBytes() { + val objectValue = + ObjectValue(linkedMapOf("id" to IntegerValue(42L), "name" to StringValue("John Doe"))) + val objectValueMap = converter.convert(objectValue) + val jsonBytes = objectValueMap?.serializeToJsonBytes() + assertNotNull(jsonBytes) + assertArrayEquals(Jsons.writeValueAsBytes(objectValueMap), jsonBytes) + } +}