Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Snowflake: Sync Id, generation_id and Meta #39107

Merged
merged 1 commit into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.35.15'
cdkVersionRequired = '0.37.1'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.9.1
dockerImageTag: 3.10.0
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
import io.airbyte.integrations.base.destination.typing_deduping.Sql
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeAbMetaAndGenIdMigration
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeDV2Migration
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeState
import io.airbyte.integrations.destination.snowflake.operation.SnowflakeStagingClient
Expand All @@ -43,6 +44,7 @@ import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSq
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.util.*
Expand All @@ -53,7 +55,6 @@ import javax.sql.DataSource
import net.snowflake.client.core.SFSession
import net.snowflake.client.core.SFStatement
import net.snowflake.client.jdbc.SnowflakeSQLException
import org.apache.commons.lang3.StringUtils
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand All @@ -63,7 +64,7 @@ constructor(
private val airbyteEnvironment: String,
private val nameTransformer: NamingConventionTransformer = SnowflakeSQLNameTransformer(),
) : BaseConnector(), Destination {
private val destinationColumns = JavaBaseConstants.DestinationColumns.V2_WITHOUT_META
private val destinationColumns = JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION

override fun check(config: JsonNode): AirbyteConnectionStatus? {
val dataSource = getDataSource(config)
Expand Down Expand Up @@ -123,7 +124,8 @@ constructor(
),
isSchemaMismatch = true,
isFinalTableEmpty = true,
destinationState = SnowflakeState(false)
destinationState =
SnowflakeState(needsSoftReset = false, isAirbyteMetaPresentInRaw = false)
)
// We simulate a mini-sync to see the raw table code path is exercised. and disable T+D
snowflakeDestinationHandler.createNamespaces(setOf(rawTableSchemaName, outputSchema))
Expand Down Expand Up @@ -151,7 +153,10 @@ constructor(
),
)
streamOperation.writeRecords(streamConfig, listOf(message).stream())
streamOperation.finalizeTable(streamConfig, StreamSyncSummary.DEFAULT)
streamOperation.finalizeTable(
streamConfig,
StreamSyncSummary(1, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE),
)
// clean up the raw table, this is intentionally not part of actual sync code
// because we avoid dropping original tables directly.
snowflakeDestinationHandler.execute(
Expand Down Expand Up @@ -190,41 +195,34 @@ constructor(
AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(config)

val defaultNamespace = config["schema"].asText()
for (stream in catalog.streams) {
if (StringUtils.isEmpty(stream.stream.namespace)) {
stream.stream.namespace = defaultNamespace
}
}

val retentionPeriodDays =
getRetentionPeriodDays(
config[RETENTION_PERIOD_DAYS],
)
val sqlGenerator = SnowflakeSqlGenerator(retentionPeriodDays)
val database = getDatabase(getDataSource(config))
val databaseName = config[JdbcUtils.DATABASE_KEY].asText()
val rawTableSchemaName: String
val catalogParser: CatalogParser
if (getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent) {
rawTableSchemaName = getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get()
catalogParser = CatalogParser(sqlGenerator, rawTableSchemaName)
} else {
rawTableSchemaName = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE
catalogParser = CatalogParser(sqlGenerator)
}
val rawTableSchemaName: String =
if (getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent) {
getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get()
} else {
JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE
}
val catalogParser = CatalogParser(sqlGenerator, defaultNamespace, rawTableSchemaName)
val snowflakeDestinationHandler =
SnowflakeDestinationHandler(databaseName, database, rawTableSchemaName)
val parsedCatalog: ParsedCatalog = catalogParser.parseCatalog(catalog)
val disableTypeDedupe =
config.has(DISABLE_TYPE_DEDUPE) && config[DISABLE_TYPE_DEDUPE].asBoolean(false)
val migrations =
listOf<Migration<SnowflakeState>>(
val migrations: List<Migration<SnowflakeState>> =
listOf(
SnowflakeDV2Migration(
nameTransformer,
database,
databaseName,
sqlGenerator,
),
SnowflakeAbMetaAndGenIdMigration(database),
)

val snowflakeStagingClient = SnowflakeStagingClient(database)
Expand Down Expand Up @@ -264,8 +262,7 @@ constructor(
},
onFlush = DefaultFlush(optimalFlushBatchSize, syncOperation),
catalog = catalog,
bufferManager = BufferManager(snowflakeBufferMemoryLimit),
defaultNamespace = Optional.of(defaultNamespace),
bufferManager = BufferManager(defaultNamespace, snowflakeBufferMemoryLimit)
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake.migrations

import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
import io.airbyte.commons.json.Jsons
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.LinkedHashMap

private val log = KotlinLogging.logger {}

class SnowflakeAbMetaAndGenIdMigration(private val database: JdbcDatabase) :
Migration<SnowflakeState> {
override fun migrateIfNecessary(
destinationHandler: DestinationHandler<SnowflakeState>,
stream: StreamConfig,
state: DestinationInitialStatus<SnowflakeState>
edgao marked this conversation as resolved.
Show resolved Hide resolved
): Migration.MigrationResult<SnowflakeState> {
if (state.destinationState.isAirbyteMetaPresentInRaw) {
log.info {
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} " +
"because previous destination state has isAirbyteMetaPresent"
}
return Migration.MigrationResult(state.destinationState, false)
}

if (!state.initialRawTableStatus.rawTableExists) {
// The raw table doesn't exist. No migration necessary. Update the state.
log.info {
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} because the raw table doesn't exist"
}
return Migration.MigrationResult(
state.destinationState.copy(isAirbyteMetaPresentInRaw = true),
false
)
}

// Snowflake will match the lowercase raw table even with QUOTED_IDENTIFIER_IGNORE_CASE =
// TRUE
val results =
database.queryJsons(
"SHOW COLUMNS IN TABLE \"${stream.id.rawNamespace}\".\"${stream.id.rawName}\""
)
val rawTableDefinition =
results
.groupBy { it.get("schema_name").asText()!! }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're already restricting to specifically airbyte_internal.whatever, why do we need to group by schema name / table name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to avoid. list has 1 element and first() call i guess.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you get the same result with this?

TableDefinition(
            results.associateTo(LinkedHashMap()) {
                // return value of data_type in show columns is a json string.
                val dataType = Jsons.deserialize(it.get("data_type").asText())
                it.get("column_name").asText()!! to
                    ColumnDefinition(
                        it.get("column_name").asText(),
                        dataType.get("type").asText(),
                        0,
                        dataType.get("nullable").asBoolean(),
                    )
            }
        )

i.e. don't need to group on schema/table name at all, just directly build the map of column name to definition

(... but also, I would hope that kotlin has some easy .first accessor :P )

.mapValues { (_, v) ->
v.groupBy { it.get("table_name").asText()!! }
.mapValuesTo(LinkedHashMap()) { (_, v) ->
edgao marked this conversation as resolved.
Show resolved Hide resolved
TableDefinition(
v.associateTo(LinkedHashMap()) {
// return value of data_type in show columns is a json string.
val dataType = Jsons.deserialize(it.get("data_type").asText())
edgao marked this conversation as resolved.
Show resolved Hide resolved
it.get("column_name").asText()!! to
ColumnDefinition(
it.get("column_name").asText(),
dataType.get("type").asText(),
0,
dataType.get("nullable").asBoolean(),
)
},
)
}
}
// default is lower case raw tables, for accounts with QUOTED_IDENTIFIER_IGNORE_CASE = TRUE
// we have to match uppercase
val isUpperCaseIdentifer =
!rawTableDefinition.containsKey(stream.id.rawNamespace) &&
rawTableDefinition.containsKey(stream.id.rawNamespace.uppercase())
val rawNamespace: String
val rawName: String
val abMetaColumn: String
if (isUpperCaseIdentifer) {
rawNamespace = stream.id.rawNamespace.uppercase()
rawName = stream.id.rawName.uppercase()
abMetaColumn = JavaBaseConstants.COLUMN_NAME_AB_META.uppercase()
} else {
rawNamespace = stream.id.rawNamespace
rawName = stream.id.rawName
abMetaColumn = JavaBaseConstants.COLUMN_NAME_AB_META
}
rawTableDefinition[rawNamespace]?.get(rawName)?.let { tableDefinition ->
if (tableDefinition.columns.containsKey(abMetaColumn)) {
log.info {
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} " +
"because the raw table already has the airbyte_meta column"
}
} else {
log.info {
"Migrating airbyte_meta/generation_id for table ${stream.id.rawNamespace}.${stream.id.rawName}"
}
// Quote for raw table columns
val alterRawTableSql =
"""
ALTER TABLE "${stream.id.rawNamespace}"."${stream.id.rawName}"
ADD COLUMN "${JavaBaseConstants.COLUMN_NAME_AB_META}" VARIANT,
COLUMN "${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}" INTEGER;
""".trimIndent()
database.execute(alterRawTableSql)
}
}

// To avoid another metadata query in Snowflake, we rely on the initial status gathering
// which already checks for the columns in the final table to indicate schema mismatch
// to safeguard if the schema mismatch is due to meta columns or customer's column
// executing an add column with if not exists check
if (state.isFinalTablePresent && state.isSchemaMismatch) {
log.info {
"Migrating generation_id for table ${stream.id.finalNamespace}.${stream.id.finalName}"
}
// explicitly uppercase and quote the final table column.
val alterFinalTableSql =
"""
ALTER TABLE "${stream.id.finalNamespace}"."${stream.id.finalName}"
ADD COLUMN IF NOT EXISTS "${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase()}" INTEGER;
""".trimIndent()
database.execute(alterFinalTableSql)
// Final table schema changed, fetch the initial status again
return Migration.MigrationResult(
state.destinationState.copy(isAirbyteMetaPresentInRaw = true),
true
)
}

// Final table is untouched, so we don't need to fetch the initial status
return Migration.MigrationResult(
state.destinationState.copy(isAirbyteMetaPresentInRaw = true),
false
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class SnowflakeDV2Migration(
): Migration.MigrationResult<SnowflakeState> {
log.info { "Initializing DV2 Migration check" }
legacyV1V2migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream)
return Migration.MigrationResult(SnowflakeState(false), true)
return Migration.MigrationResult(
SnowflakeState(needsSoftReset = false, isAirbyteMetaPresentInRaw = false),
true
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import io.airbyte.integrations.base.destination.typing_deduping.migrators.Minimu

// Note the nonnullable fields. Even though the underlying storage medium (a JSON blob) supports
// nullability, we don't want to deal with that in our codebase.
data class SnowflakeState(val needsSoftReset: Boolean) : MinimumDestinationState {
data class SnowflakeState(val needsSoftReset: Boolean, val isAirbyteMetaPresentInRaw: Boolean) :
MinimumDestinationState {
override fun needsSoftReset(): Boolean {
return needsSoftReset
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ class SnowflakeStorageOperation(
| "${JavaBaseConstants.COLUMN_NAME_AB_RAW_ID}" VARCHAR PRIMARY KEY,
| "${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT}" TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp(),
| "${JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT}" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
| "${JavaBaseConstants.COLUMN_NAME_DATA}" VARIANT
| "${JavaBaseConstants.COLUMN_NAME_DATA}" VARIANT,
| "${JavaBaseConstants.COLUMN_NAME_AB_META}" VARIANT DEFAULT NULL,
| "${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}" INTEGER DEFAULT NULL
|) data_retention_time_in_days = $retentionPeriodDays;
""".trimMargin()
}
Expand All @@ -60,11 +62,16 @@ class SnowflakeStorageOperation(
return "TRUNCATE TABLE \"${streamId.rawNamespace}\".\"${streamId.rawName}\";\n"
}

override fun writeToStage(streamId: StreamId, data: SerializableBuffer) {
val stageName = getStageName(streamId)
override fun writeToStage(streamConfig: StreamConfig, data: SerializableBuffer) {
val stageName = getStageName(streamConfig.id)
val stagingPath = getStagingPath()
val stagedFileName = staging.uploadRecordsToStage(data, stageName, stagingPath)
staging.copyIntoTableFromStage(stageName, stagingPath, listOf(stagedFileName), streamId)
staging.copyIntoTableFromStage(
stageName,
stagingPath,
listOf(stagedFileName),
streamConfig.id
)
}
override fun cleanupStage(streamId: StreamId) {
val stageName = getStageName(streamId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,19 @@ class SnowflakeDestinationHandler(
database.executeMetadataQuery { databaseMetaData: DatabaseMetaData ->
LOGGER.info("Retrieving table from Db metadata: {} {}", id.rawNamespace, id.rawName)
try {
databaseMetaData
.getTables(databaseName, id.rawNamespace, id.rawName, null)
.use { tables ->
return@executeMetadataQuery tables.next()
}
val rs =
databaseMetaData.getTables(databaseName, id.rawNamespace, id.rawName, null)
// When QUOTED_IDENTIFIERS_IGNORE_CASE is set to true, the raw table is
// interpreted as uppercase
// in db metadata calls. check for both
val rsUppercase =
databaseMetaData.getTables(
databaseName,
id.rawNamespace.uppercase(),
id.rawName.uppercase(),
null
)
rs.next() || rsUppercase.next()
} catch (e: SQLException) {
LOGGER.error("Failed to retrieve table metadata", e)
throw RuntimeException(e)
Expand Down Expand Up @@ -287,6 +295,14 @@ class SnowflakeDestinationHandler(
"VARIANT" == existingTable.columns[abMetaColumnName]!!.type
}

fun isAirbyteGenerationIdColumnMatch(existingTable: TableDefinition): Boolean {
val abGenerationIdColumnName: String =
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase(Locale.getDefault())
return existingTable.columns.containsKey(abGenerationIdColumnName) &&
toJdbcTypeName(AirbyteProtocolType.INTEGER) ==
existingTable.columns[abGenerationIdColumnName]!!.type
}

@SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE")
override fun existingSchemaMatchesStreamConfig(
stream: StreamConfig?,
Expand All @@ -299,7 +315,8 @@ class SnowflakeDestinationHandler(
if (
!isAirbyteRawIdColumnMatch(existingTable) ||
!isAirbyteExtractedAtColumnMatch(existingTable) ||
!isAirbyteMetaColumnMatch(existingTable)
!isAirbyteMetaColumnMatch(existingTable) ||
!isAirbyteGenerationIdColumnMatch(existingTable)
) {
// Missing AB meta columns from final table, we need them to do proper T+D so trigger
// soft-reset
Expand Down Expand Up @@ -417,8 +434,13 @@ class SnowflakeDestinationHandler(
}

override fun toDestinationState(json: JsonNode): SnowflakeState {
// Note the field name is isAirbyteMetaPresentInRaw but jackson interprets it as
// airbyteMetaPresentInRaw when serializing so we map that to the correct field when
// deserializing
return SnowflakeState(
json.hasNonNull("needsSoftReset") && json["needsSoftReset"].asBoolean()
json.hasNonNull("needsSoftReset") && json["needsSoftReset"].asBoolean(),
json.hasNonNull("airbyteMetaPresentInRaw") &&
json["airbyteMetaPresentInRaw"].asBoolean()
)
}

Expand Down
Loading
Loading