Skip to content

Commit

Permalink
Bulk Load CDK: AirbyteType & AirbyteValue, marshaling from json
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Sep 16, 2024
1 parent 91013e1 commit 4cfe34c
Show file tree
Hide file tree
Showing 15 changed files with 1,085 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ import jakarta.inject.Singleton
* Internal representation of destination streams. This is intended to be a case class specialized
* for usability.
*/
data class DestinationCatalog(
val streams: List<DestinationStream> = emptyList(),
) {
data class DestinationCatalog(val streams: List<DestinationStream> = emptyList()) {
private val byDescriptor: Map<DestinationStream.Descriptor, DestinationStream> =
streams.associateBy { it.descriptor }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

package io.airbyte.cdk.command

import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.data.AirbyteType
import io.airbyte.cdk.data.AirbyteTypeToJsonSchema
import io.airbyte.cdk.data.JsonSchemaToAirbyteType
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
Expand All @@ -22,7 +24,7 @@ import jakarta.inject.Singleton
data class DestinationStream(
val descriptor: Descriptor,
val importType: ImportType,
val schema: ObjectNode,
val schema: AirbyteType,
val generationId: Long,
val minimumGenerationId: Long,
val syncId: Long,
Expand All @@ -44,7 +46,7 @@ data class DestinationStream(
AirbyteStream()
.withNamespace(descriptor.namespace)
.withName(descriptor.name)
.withJsonSchema(schema)
.withJsonSchema(AirbyteTypeToJsonSchema().convert(schema))
)
.withGenerationId(generationId)
.withMinimumGenerationId(minimumGenerationId)
Expand Down Expand Up @@ -83,10 +85,10 @@ class DestinationStreamFactory {
DestinationSyncMode.APPEND_DEDUP ->
Dedupe(primaryKey = stream.primaryKey, cursor = stream.cursorField)
},
schema = stream.stream.jsonSchema as ObjectNode,
generationId = stream.generationId,
minimumGenerationId = stream.minimumGenerationId,
syncId = stream.syncId,
schema = JsonSchemaToAirbyteType().convert(stream.stream.jsonSchema)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.data

sealed interface AirbyteType

data object NullType : AirbyteType

data object StringType : AirbyteType

data object BooleanType : AirbyteType

data object IntegerType : AirbyteType

data object NumberType : AirbyteType

data object DateType : AirbyteType

data class TimestampType(val hasTimezone: Boolean) : AirbyteType

data class TimeType(val hasTimezone: Boolean) : AirbyteType

data class ArrayType(val items: FieldType) : AirbyteType

data object ArrayTypeWithoutSchema : AirbyteType

data class ObjectType(val properties: LinkedHashMap<String, FieldType>) : AirbyteType

data object ObjectTypeWithEmptySchema : AirbyteType

data object ObjectTypeWithoutSchema : AirbyteType

data class UnionType(val options: List<AirbyteType>) : AirbyteType

data class UnknownType(val what: String) : AirbyteType

data class FieldType(val type: AirbyteType, val nullable: Boolean)
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.data

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.JsonNodeFactory
import com.fasterxml.jackson.databind.node.ObjectNode

class AirbyteTypeToJsonSchema {
private fun ofType(typeName: String): ObjectNode {
return JsonNodeFactory.instance.objectNode().put("type", typeName)
}

fun convert(airbyteType: AirbyteType): JsonNode {
return when (airbyteType) {
is NullType -> ofType("null")
is StringType -> ofType("string")
is BooleanType -> ofType("boolean")
is IntegerType -> ofType("integer")
is NumberType -> ofType("number")
is ArrayType ->
JsonNodeFactory.instance
.objectNode()
.put("type", "array")
.set("items", fromFieldType(airbyteType.items))
is ArrayTypeWithoutSchema -> ofType("array")
is ObjectType -> {
val objNode = ofType("object")
val properties = objNode.putObject("properties")
airbyteType.properties.forEach { (name, field) ->
properties.replace(name, fromFieldType(field))
}
objNode
}
is ObjectTypeWithoutSchema -> ofType("object")
is ObjectTypeWithEmptySchema -> {
val objectNode = ofType("object")
objectNode.putObject("properties")
objectNode
}
is UnionType -> {
val unionNode = JsonNodeFactory.instance.objectNode()
val unionOptions = unionNode.putArray("oneOf")
airbyteType.options.forEach { unionOptions.add(convert(it)) }
unionNode
}
is DateType -> ofType("string").put("format", "date")
is TimeType -> {
val timeNode = ofType("string").put("format", "time")
if (airbyteType.hasTimezone) {
timeNode.put("airbyte_type", "time_with_timezone")
} else {
timeNode.put("airbyte_type", "time_without_timezone")
}
}
is TimestampType -> {
val timestampNode = ofType("string").put("format", "date-time")
if (airbyteType.hasTimezone) {
timestampNode.put("airbyte_type", "timestamp_with_timezone")
} else {
timestampNode.put("airbyte_type", "timestamp_without_timezone")
}
}
else -> throw IllegalArgumentException("Unknown type: $airbyteType")
}
}

private fun fromFieldType(field: FieldType): JsonNode {
if (field.nullable) {
if (field.type is UnionType) {
return convert(UnionType(options = field.type.options + NullType))
}
return convert(UnionType(options = listOf(field.type, NullType)))
}
return convert(field.type)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.data

import java.math.BigDecimal

sealed interface AirbyteValue

data object NullValue : AirbyteValue

@JvmInline value class StringValue(val value: String) : AirbyteValue

@JvmInline value class BooleanValue(val value: Boolean) : AirbyteValue

@JvmInline value class IntegerValue(val value: Long) : AirbyteValue

@JvmInline value class NumberValue(val value: BigDecimal) : AirbyteValue

@JvmInline value class DateValue(val value: String) : AirbyteValue

@JvmInline value class TimestampValue(val value: String) : AirbyteValue

@JvmInline value class TimeValue(val value: String) : AirbyteValue

@JvmInline value class ArrayValue(val values: List<AirbyteValue>) : AirbyteValue

@JvmInline value class ObjectValue(val values: LinkedHashMap<String, AirbyteValue>) : AirbyteValue

@JvmInline value class UnknownValue(val what: String) : AirbyteValue
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.data

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.JsonNodeFactory

class AirbyteValueToJson {
fun convert(value: AirbyteValue): JsonNode {
return when (value) {
is ArrayValue ->
JsonNodeFactory.instance.arrayNode().addAll(value.values.map { convert(it) })
is BooleanValue -> JsonNodeFactory.instance.booleanNode(value.value)
is DateValue -> JsonNodeFactory.instance.textNode(value.value)
is IntegerValue -> JsonNodeFactory.instance.numberNode(value.value)
is NullValue -> JsonNodeFactory.instance.nullNode()
is NumberValue -> JsonNodeFactory.instance.numberNode(value.value)
is ObjectValue -> {
val objNode = JsonNodeFactory.instance.objectNode()
value.values.forEach { (name, field) -> objNode.replace(name, convert(field)) }
objNode
}
is StringValue -> JsonNodeFactory.instance.textNode(value.value)
is TimeValue -> JsonNodeFactory.instance.textNode(value.value)
is TimestampValue -> JsonNodeFactory.instance.textNode(value.value)
is UnknownValue -> throw IllegalArgumentException("Unknown value: $value")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.data

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.JsonNodeFactory
import com.fasterxml.jackson.databind.node.ObjectNode

class JsonSchemaToAirbyteType {
fun convert(schema: JsonNode): AirbyteType {
// try {
if (schema.isObject && schema.has("type")) {
// Normal json object with {"type": ..., ...}
val schemaType = (schema as ObjectNode).get("type")
return if (schemaType.isTextual) {
// {"type": <string>, ...}
when (schema.get("type").asText()) {
"string" -> fromString(schema)
"boolean" -> BooleanType
"integer" -> IntegerType
"number" -> fromNumber(schema)
"array" -> fromArray(schema)
"object" -> fromObject(schema)
"null" -> NullType
else ->
throw IllegalArgumentException(
"Unknown type: ${
schema.get("type").asText()
}"
)
}
} else if (schemaType.isArray) {
// {"type": [...], ...}
unionFromCombinedTypes(schemaType.toList(), schema)
} else {
UnknownType("unspported type for 'type' field: $schemaType")
}
} else if (schema.isObject) {
// {"oneOf": [...], ...} or {"anyOf": [...], ...} or {"allOf": [...], ...}
val options = schema.get("oneOf") ?: schema.get("anyOf") ?: schema.get("allOf")
return if (options != null) {
UnionType(options.map { convert(it as ObjectNode) })
} else {
// Default to object if no type and not a union type
convert((schema as ObjectNode).put("type", "object"))
}
} else if (schema.isTextual) {
// "<typename>"
val typeSchema = JsonNodeFactory.instance.objectNode().put("type", schema.asText())
return convert(typeSchema)
} else {
return UnknownType("Unknown schema type: $schema")
}
} // catch (t: Throwable) {
// return UnknownType(t.message ?: "Unknown error")
// }
// }

private fun fromString(schema: ObjectNode): AirbyteType =
when (schema.get("format")?.asText()) {
"date" -> DateType
"time" ->
TimeType(
hasTimezone = schema.get("airbyte_type")?.asText() != "time_without_timezone"
)
"date-time" ->
TimestampType(
hasTimezone =
schema.get("airbyte_type")?.asText() != "timestamp_without_timezone"
)
null -> StringType
else ->
throw IllegalArgumentException(
"Unknown string format: ${
schema.get("format").asText()
}"
)
}

private fun fromNumber(schema: ObjectNode): AirbyteType =
if (schema.get("airbyte_type")?.asText() == "integer") {
IntegerType
} else {
NumberType
}

private fun fromArray(schema: ObjectNode): AirbyteType {
val items = schema.get("items") ?: return ArrayTypeWithoutSchema
if (items.isArray) {
if (items.isEmpty) {
return ArrayTypeWithoutSchema
}
val itemOptions = UnionType(items.map { convert(it) })
return ArrayType(fieldFromUnion(itemOptions))
}
return ArrayType(fieldFromSchema(items as ObjectNode))
}

private fun fromObject(schema: ObjectNode): AirbyteType {
val properties = schema.get("properties") ?: return ObjectTypeWithoutSchema
if (properties.isEmpty) {
return ObjectTypeWithEmptySchema
}
val requiredFields = schema.get("required")?.map { it.asText() } ?: emptyList()
return objectFromProperties(properties as ObjectNode, requiredFields)
}

private fun fieldFromSchema(
fieldSchema: ObjectNode,
onRequiredList: Boolean = false
): FieldType {
val markedRequired = fieldSchema.get("required")?.asBoolean() ?: false
val nullable = !(onRequiredList || markedRequired)
val airbyteType = convert(fieldSchema)
if (airbyteType is UnionType) {
return fieldFromUnion(airbyteType, nullable)
} else {
return FieldType(airbyteType, nullable)
}
}

private fun fieldFromUnion(unionType: UnionType, nullable: Boolean = false): FieldType {
if (unionType.options.contains(NullType)) {
val filtered = unionType.options.filter { it != NullType }
return FieldType(UnionType(filtered), nullable = true)
}
return FieldType(unionType, nullable = nullable)
}

private fun objectFromProperties(schema: ObjectNode, requiredFields: List<String>): ObjectType {
val properties =
schema
.fields()
.asSequence()
.map { (name, node) ->
name to fieldFromSchema(node as ObjectNode, requiredFields.contains(name))
}
.toMap(LinkedHashMap())
return ObjectType(properties)
}

private fun unionFromCombinedTypes(
options: List<JsonNode>,
parentSchema: ObjectNode
): UnionType {
// Denormalize the properties across each type (the converter only checks what matters
// per type).
val unionOptions =
options.map {
if (it.isTextual) {
val schema = parentSchema.deepCopy()
schema.put("type", it.textValue())
convert(schema)
} else {
convert(it)
}
}
return UnionType(unionOptions)
}
}
Loading

0 comments on commit 4cfe34c

Please sign in to comment.