Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Load CDK: AirbyteType & AirbyteValue, marshaling from json #45430

Merged
merged 2 commits into from
Sep 16, 2024

Conversation

johnny-schmidt
Copy link
Contributor

What

AirbyteType, AirbyteValue, with the converters in their own classes, and tests.

Limitations:

  • The tests could be more robust (complexly nested types, more edge cases for unions, etc)
  • I'm super happy with the fact that we're so opinionated about merging/validating unions up front; I'd rather that be a discrete mapping step, possibly limited to avro/parquet conversions

@johnny-schmidt johnny-schmidt requested a review from a team as a code owner September 12, 2024 21:24
Copy link

vercel bot commented Sep 12, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Sep 16, 2024 6:36pm

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Sep 12, 2024
Copy link
Contributor

@edgao edgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

braindumping some stuff - we maybe should chat live about this tomorrow, I'm getting more nervous :P (but again, maybe I'm overreacting)

also, one comment around backwards-compat in the schema conversion + a few nits.

  • should/can we match existing behavior? (are we going to accidentally lose precision in certain cases, fail to cast certain edge cases, etc)
    • db/dw destinations - will this accidentally change how we write the raw data, in ways that are hard to catch?
    • avro - 🤷 🤷 🤷
  • edge case behavior change: consider this scenario
    1. source declares field updated_at as {type: string, format: date-time, airbyte_type: date}
    2. source emits records with {updated_at: 2024-01-23T12:34:56Z} (i.e. full timestamptz)
    3. warehouse destination writes the raw record with full TZ, then creates the final table with a date column
    4. source later updates the schema to be {type: string, .... airbyte_type: timestamptz}
    5. destination rebuilds the final table, which now contains the full timestamp
  • if we're typing in-connector, then the raw table will contain just the truncated date 2024-01-23, so even if the source fixes the schema, the user has to resync to get the fixed data - do we care?


package io.airbyte.cdk.data

sealed class AirbyteType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sealed class AirbyteType
sealed interface AirbyteType

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

import com.fasterxml.jackson.databind.node.ObjectNode

class JsonSchemaToAirbyteType {
fun convert(schema: ObjectNode): AirbyteType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intrusive thought: this seems pretty critical to get right, otherwise we're going to change user-facing data types. is there some way we can test this implementation for equivalence to the existing implementations? (and/or do we want to do a one-time breaking release, to get ourselves into a better state?)

+should we check for the existence of weird schemas where this implementation behaves differently. E.g. {type: string, airbyte_type: timestamp_with_timezone} - this implementation turns that into plain String, I suspect existing implementations would generate Timestamp(hasTimezone=true)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we're going to have to write the reverse mapper to get this back into json for test inputs anyway

So, we'll have a map <-> reverse map

So, a first step for converting a destination could be to put that into the deserialization path on the old destination & then run DATs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, re the weird implementation, my existing avro/parquet path probably misses that one as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point (I gut thought it would be hard to plug this code into an old destination, but that feels solvable) 🤷

I tend to err conservative on these behaviors, but that's probably overcautious, given that the s3 rollout went fine 🚛


private fun fromUnionOptions(options: List<AirbyteType>): FieldType {
if (options.size == 1) {
return FieldType(options.first(), nullable = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return FieldType(options.first(), nullable = false)
return FieldType(options.first(), nullable = true)

(I think?)

options.fold(mutableSetOf<AirbyteType>()) { acc, type ->
if (acc.contains(type)) {
// Do nothing: type exists and is identical
} else if (type is ObjectType && acc.any { it is ObjectType }) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this type merging be opt in? i.e. could there be a destination that supports Union(Object(properties...), Object(differentProperties)) or Union(Union(opt1, opt2), opt3) as an actual type, and do we care

@edgao
Copy link
Contributor

edgao commented Sep 12, 2024

(sorry :( I only started thinking about this stuff after starting to review this pr. Would have been much better if I'd thought of these things before you started implementation)

Copy link
Contributor

@edgao edgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems mostly right, had some specific comments in the value handling.

(I only skimmed most of the schema logic + didn't read the tests at all - lmk if you want me to take a deeper read through)

ArrayValue(json.map { convert(it, schema.items) })
}
is ArrayTypeWithoutSchema ->
orUnknown(json, json.isArray) { ArrayValue(json.map { fromAny(it) }) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be fromUnknown? my understanding is that fromAny converts a real java type back to jsonnode, but here it is a jsonnode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromAny is now fromJson(jsonNode) and arrays are built by mapping the nodes through it

)
}
is ObjectTypeWithEmptySchema ->
orUnknown(json, json.isObject) { objectFromMap(emptyMap<String, FieldType>()) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we wanted to treat this case identically to ObjectTypeWithoutSchema? this looks like it's just nuking all the fields

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed that


@JvmInline value class IntegerValue(val value: Long) : AirbyteValue

@JvmInline value class NumberValue(val value: Double) : AirbyteValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@JvmInline value class NumberValue(val value: Double) : AirbyteValue
@JvmInline value class NumberValue(val value: BigDecimal) : AirbyteValue

(we've seen loss of precision with float64)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
}

private fun fromAny(value: Any?): AirbyteValue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand this function here - this class is all about converting JsonNode -> AirbyteValue, we should never be handling raw Any? (there's a few calls where it looks like we're passing a JsonNode to fromAny, which sounds wrong)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I fixed that


@JvmInline value class DateValue(val value: String) : AirbyteValue

@JvmInline value class TimestampValue(val value: String) : AirbyteValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have separate classes for time/timestamp with/without tz?

class TimeWithTimezone(OffsetTime)
class TimeWithoutTimezone(LocalTime)
class TimestampWithTimezone(OffsetDateTime)
class TimestampWithoutTimezone(LocalDateTime)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone back and forth on it. Only if we're converting timestamps I think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷

let's do that for now, and if we run into trouble, we can always switch this around

import com.fasterxml.jackson.databind.node.ObjectNode

class JsonSchemaToAirbyteType {
fun convert(schema: ObjectNode): AirbyteType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point (I gut thought it would be hard to plug this code into an old destination, but that feels solvable) 🤷

I tend to err conservative on these behaviors, but that's probably overcautious, given that the s3 rollout went fine 🚛

@@ -39,7 +42,8 @@ class DestinationStreamFactory {
DestinationStream.Descriptor(
namespace = stream.stream.namespace,
name = stream.stream.name
)
),
schema = JsonSchemaToAirbyteType().convert(stream.stream.jsonSchema as ObjectNode)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to remove this cast. I want this never to fail.

@johnny-schmidt johnny-schmidt force-pushed the issue-9805/airbyte-type-value branch 2 times, most recently from 83b9123 to 3d3ee41 Compare September 13, 2024 19:55
@johnny-schmidt
Copy link
Contributor Author

After rebase it needs reverse mapping to function. :(

@edgao
Copy link
Contributor

edgao commented Sep 13, 2024

reverse mapping

for the asProtocolObject() stuff? you could always just TODO() that :P (or hardcode return emptyObject(), idk)

( + disable the tests, I guess.)

@johnny-schmidt johnny-schmidt merged commit b17df3f into master Sep 16, 2024
29 checks passed
@johnny-schmidt johnny-schmidt deleted the issue-9805/airbyte-type-value branch September 16, 2024 18:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants