Skip to content
1 change: 1 addition & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ kotlin {

dependencies {
api("com.github.f4b6a3:uuid-creator:6.1.1")
implementation 'commons-codec:commons-codec:1.16.0'

implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation 'org.apache.commons:commons-lang3:3.17.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import io.airbyte.cdk.load.command.NamespaceMapper
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.load.message.DestinationRecordStreamComplete
import io.airbyte.cdk.load.message.InputRecord
import io.airbyte.cdk.load.schema.model.ColumnSchema
import io.airbyte.cdk.load.schema.model.StreamTableSchema
import io.airbyte.cdk.load.schema.model.TableName
import io.airbyte.cdk.load.schema.model.TableNames
import io.airbyte.cdk.load.util.serializeToString
import io.airbyte.cdk.load.write.WriteOperation
import io.github.oshai.kotlinlogging.KotlinLogging
Expand Down Expand Up @@ -46,7 +50,18 @@ interface DestinationChecker<C : DestinationConfiguration> {
generationId = 1,
minimumGenerationId = 0,
syncId = 1,
namespaceMapper = NamespaceMapper()
namespaceMapper = NamespaceMapper(),
tableSchema =
StreamTableSchema(
tableNames = TableNames(finalTableName = TableName("testing", "test")),
columnSchema =
ColumnSchema(
rawSchema = mapOf(),
rawToFinalColumnNames = mapOf(),
finalColumnSchema = mapOf(),
),
importType = Append,
)
)

fun check(config: C)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ import io.airbyte.cdk.load.config.CHECK_STREAM_NAMESPACE
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.schema.TableNameResolver
import io.airbyte.cdk.load.schema.model.ColumnSchema
import io.airbyte.cdk.load.schema.model.StreamTableSchema
import io.airbyte.cdk.load.schema.model.TableNames
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Value
import io.micronaut.context.annotation.Requires
import jakarta.inject.Named
import jakarta.inject.Singleton
import java.time.LocalDate
Expand Down Expand Up @@ -91,45 +95,73 @@ data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()
}
}

interface DestinationCatalogFactory {
fun make(): DestinationCatalog
}

@Factory
class DefaultDestinationCatalogFactory {
@Requires(property = Operation.PROPERTY, value = "sync")
@Singleton
fun getDestinationCatalog(
fun syncCatalog(
catalog: ConfiguredAirbyteCatalog,
streamFactory: DestinationStreamFactory,
@Value("\${${Operation.PROPERTY}}") operation: String,
tableNameResolver: TableNameResolver,
): DestinationCatalog {
val descriptors =
catalog.streams
.map { DestinationStream.Descriptor(it.stream.namespace, it.stream.name) }
.toSet()
val names = tableNameResolver.getTableNameMapping(descriptors)

return DestinationCatalog(
streams =
catalog.streams.map {
val key = DestinationStream.Descriptor(it.stream.namespace, it.stream.name)
streamFactory.make(it, names[key]!!)
}
)
}

/**
* Warning: Most destinations do not use this.
*
* Catalog stub for running SYNC from within a CHECK operation.
*
* Used exclusively by the DefaultDestinationChecker.
*/
@Requires(property = Operation.PROPERTY, value = "check")
@Singleton
fun checkCatalog(
@Named("checkNamespace") checkNamespace: String?,
namespaceMapper: NamespaceMapper
): DestinationCatalog {
if (operation == "check") {
// generate a string like "20240523"
val date = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"))
// generate 5 random characters
val random = RandomStringUtils.insecure().nextAlphabetic(5).lowercase()
val namespace = checkNamespace ?: "${CHECK_STREAM_NAMESPACE}_$date$random"
return DestinationCatalog(
listOf(
DestinationStream(
unmappedNamespace = namespace,
unmappedName = "test$date$random",
importType = Append,
schema =
ObjectType(
linkedMapOf("test" to FieldType(IntegerType, nullable = true))
),
generationId = 1,
minimumGenerationId = 0,
syncId = 1,
namespaceMapper = namespaceMapper
)
// generate a string like "20240523"
val date = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"))
// generate 5 random characters
val random = RandomStringUtils.insecure().nextAlphabetic(5).lowercase()
val namespace = checkNamespace ?: "${CHECK_STREAM_NAMESPACE}_$date$random"
return DestinationCatalog(
listOf(
DestinationStream(
unmappedNamespace = namespace,
unmappedName = "test$date$random",
importType = Append,
schema =
ObjectType(linkedMapOf("test" to FieldType(IntegerType, nullable = true))),
generationId = 1,
minimumGenerationId = 0,
syncId = 1,
namespaceMapper = namespaceMapper,
tableSchema =
StreamTableSchema(
columnSchema =
ColumnSchema(
rawSchema = mapOf(),
rawToFinalColumnNames = mapOf(),
finalColumnSchema = mapOf()
),
importType = Append,
tableNames = TableNames(),
),
)
)
} else {
return DestinationCatalog(streams = catalog.streams.map { streamFactory.make(it) })
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ import io.airbyte.cdk.load.data.AirbyteValueProxy
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.collectUnknownPaths
import io.airbyte.cdk.load.data.json.AirbyteTypeToJsonSchema
import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.Meta
import io.airbyte.cdk.load.schema.model.StreamTableSchema
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.airbyte.protocol.models.v0.StreamDescriptor
import jakarta.inject.Singleton
import io.github.oshai.kotlinlogging.KotlinLogging

private val log = KotlinLogging.logger {}

/**
* Internal representation of destination streams. This is intended to be a case class specialized
Expand Down Expand Up @@ -64,7 +66,9 @@ data class DestinationStream(
val includeFiles: Boolean = false,
val destinationObjectName: String? = null,
val matchingKey: List<String>? = null,
private val namespaceMapper: NamespaceMapper
private val namespaceMapper: NamespaceMapper,
// fully munged table and column names with mappings
val tableSchema: StreamTableSchema,
) {
val unmappedDescriptor = Descriptor(namespace = unmappedNamespace, name = unmappedName)
val mappedDescriptor = namespaceMapper.map(namespace = unmappedNamespace, name = unmappedName)
Expand Down Expand Up @@ -181,58 +185,6 @@ fun AirbyteType.computeUnknownColumnChanges() =
)
}

@Singleton
class DestinationStreamFactory(
private val jsonSchemaToAirbyteType: JsonSchemaToAirbyteType,
private val namespaceMapper: NamespaceMapper
) {
fun make(stream: ConfiguredAirbyteStream): DestinationStream {
return DestinationStream(
unmappedNamespace = stream.stream.namespace,
unmappedName = stream.stream.name,
namespaceMapper = namespaceMapper,
importType =
when (stream.destinationSyncMode) {
null -> throw IllegalArgumentException("Destination sync mode was null")
DestinationSyncMode.APPEND -> Append
DestinationSyncMode.OVERWRITE -> Overwrite
DestinationSyncMode.APPEND_DEDUP ->
Dedupe(primaryKey = stream.primaryKey, cursor = stream.cursorField)
DestinationSyncMode.UPDATE -> Update
DestinationSyncMode.SOFT_DELETE -> SoftDelete
},
generationId = stream.generationId,
minimumGenerationId = stream.minimumGenerationId,
syncId = stream.syncId,
schema = jsonSchemaToAirbyteType.convert(stream.stream.jsonSchema),
isFileBased = stream.stream.isFileBased ?: false,
includeFiles = stream.includeFiles ?: false,
destinationObjectName = stream.destinationObjectName,
matchingKey =
stream.destinationObjectName?.let {
fromCompositeNestedKeyToCompositeKey(stream.primaryKey)
}
)
}
}

private fun fromCompositeNestedKeyToCompositeKey(
compositeNestedKey: List<List<String>>
): List<String> {
if (compositeNestedKey.any { it.size > 1 }) {
throw IllegalArgumentException(
"Nested keys are not supported for matching keys. Key was $compositeNestedKey"
)
}
if (compositeNestedKey.any { it.isEmpty() }) {
throw IllegalArgumentException(
"Parts of the composite key need to have at least one element. Key was $compositeNestedKey"
)
}

return compositeNestedKey.map { it[0] }.toList()
}

sealed interface ImportType

data object Append : ImportType
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.command

import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType
import io.airbyte.cdk.load.schema.TableSchemaFactory
import io.airbyte.cdk.load.schema.model.TableName
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton

private val log = KotlinLogging.logger {}

@Singleton
class DestinationStreamFactory(
private val jsonSchemaToAirbyteType: JsonSchemaToAirbyteType,
private val namespaceMapper: NamespaceMapper,
private val schemaFactory: TableSchemaFactory,
) {
fun make(stream: ConfiguredAirbyteStream, resolvedTableName: TableName): DestinationStream {
val airbyteSchema = jsonSchemaToAirbyteType.convert(stream.stream.jsonSchema) as ObjectType
val importType =
when (stream.destinationSyncMode) {
null -> throw IllegalArgumentException("Destination sync mode was null")
DestinationSyncMode.APPEND -> Append
DestinationSyncMode.OVERWRITE -> Overwrite
DestinationSyncMode.APPEND_DEDUP ->
Dedupe(primaryKey = stream.primaryKey, cursor = stream.cursorField)
DestinationSyncMode.UPDATE -> Update
DestinationSyncMode.SOFT_DELETE -> SoftDelete
}
val tableSchema =
schemaFactory.make(
resolvedTableName,
airbyteSchema.properties,
importType,
)

return DestinationStream(
unmappedNamespace = stream.stream.namespace,
unmappedName = stream.stream.name,
namespaceMapper = namespaceMapper,
importType = importType,
generationId = stream.generationId,
minimumGenerationId = stream.minimumGenerationId,
syncId = stream.syncId,
schema = airbyteSchema,
isFileBased = stream.stream.isFileBased ?: false,
includeFiles = stream.includeFiles ?: false,
destinationObjectName = stream.destinationObjectName,
matchingKey =
stream.destinationObjectName?.let {
fromCompositeNestedKeyToCompositeKey(stream.primaryKey)
},
tableSchema = tableSchema,
)
}

private fun fromCompositeNestedKeyToCompositeKey(
compositeNestedKey: List<List<String>>
): List<String> {
if (compositeNestedKey.any { it.size > 1 }) {
throw IllegalArgumentException(
"Nested keys are not supported for matching keys. Key was $compositeNestedKey",
)
}
if (compositeNestedKey.any { it.isEmpty() }) {
throw IllegalArgumentException(
"Parts of the composite key need to have at least one element. Key was $compositeNestedKey",
)
}

return compositeNestedKey.map { it[0] }.toList()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package io.airbyte.cdk.load.component

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.schema.model.TableName
import io.airbyte.cdk.load.table.ColumnNameMapping
import io.airbyte.cdk.load.table.TableName

/**
* Client interface for database table operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package io.airbyte.cdk.load.component

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.schema.model.TableName
import io.airbyte.cdk.load.table.ColumnNameMapping
import io.airbyte.cdk.load.table.TableName
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.contains
Expand Down

This file was deleted.

This file was deleted.

Loading
Loading