-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bulk Load CDK: AirbyteType & AirbyteValue, marshaling from json #45430
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
braindumping some stuff - we maybe should chat live about this tomorrow, I'm getting more nervous :P (but again, maybe I'm overreacting)
also, one comment around backwards-compat in the schema conversion + a few nits.
- should/can we match existing behavior? (are we going to accidentally lose precision in certain cases, fail to cast certain edge cases, etc)
- db/dw destinations - will this accidentally change how we write the raw data, in ways that are hard to catch?
- avro - 🤷 🤷 🤷
- edge case behavior change: consider this scenario
- source declares field updated_at as
{type: string, format: date-time, airbyte_type: date}
- source emits records with
{updated_at: 2024-01-23T12:34:56Z}
(i.e. full timestamptz) - warehouse destination writes the raw record with full TZ, then creates the final table with a
date
column - source later updates the schema to be
{type: string, .... airbyte_type: timestamptz}
- destination rebuilds the final table, which now contains the full timestamp
- source declares field updated_at as
- if we're typing in-connector, then the raw table will contain just the truncated date
2024-01-23
, so even if the source fixes the schema, the user has to resync to get the fixed data - do we care?
|
||
package io.airbyte.cdk.data | ||
|
||
sealed class AirbyteType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sealed class AirbyteType | |
sealed interface AirbyteType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
import com.fasterxml.jackson.databind.node.ObjectNode | ||
|
||
class JsonSchemaToAirbyteType { | ||
fun convert(schema: ObjectNode): AirbyteType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intrusive thought: this seems pretty critical to get right, otherwise we're going to change user-facing data types. is there some way we can test this implementation for equivalence to the existing implementations? (and/or do we want to do a one-time breaking release, to get ourselves into a better state?)
+should we check for the existence of weird schemas where this implementation behaves differently. E.g. {type: string, airbyte_type: timestamp_with_timezone}
- this implementation turns that into plain String, I suspect existing implementations would generate Timestamp(hasTimezone=true)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we're going to have to write the reverse mapper to get this back into json for test inputs anyway
So, we'll have a map <-> reverse map
So, a first step for converting a destination could be to put that into the deserialization path on the old destination & then run DATs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, re the weird implementation, my existing avro/parquet path probably misses that one as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair point (I gut thought it would be hard to plug this code into an old destination, but that feels solvable) 🤷
I tend to err conservative on these behaviors, but that's probably overcautious, given that the s3 rollout went fine 🚛
|
||
private fun fromUnionOptions(options: List<AirbyteType>): FieldType { | ||
if (options.size == 1) { | ||
return FieldType(options.first(), nullable = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return FieldType(options.first(), nullable = false) | |
return FieldType(options.first(), nullable = true) |
(I think?)
options.fold(mutableSetOf<AirbyteType>()) { acc, type -> | ||
if (acc.contains(type)) { | ||
// Do nothing: type exists and is identical | ||
} else if (type is ObjectType && acc.any { it is ObjectType }) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this type merging be opt in? i.e. could there be a destination that supports Union(Object(properties...), Object(differentProperties))
or Union(Union(opt1, opt2), opt3)
as an actual type, and do we care
(sorry :( I only started thinking about this stuff after starting to review this pr. Would have been much better if I'd thought of these things before you started implementation) |
27be8c0
to
27f0858
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems mostly right, had some specific comments in the value handling.
(I only skimmed most of the schema logic + didn't read the tests at all - lmk if you want me to take a deeper read through)
ArrayValue(json.map { convert(it, schema.items) }) | ||
} | ||
is ArrayTypeWithoutSchema -> | ||
orUnknown(json, json.isArray) { ArrayValue(json.map { fromAny(it) }) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be fromUnknown? my understanding is that fromAny converts a real java type back to jsonnode, but here it
is a jsonnode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fromAny is now fromJson(jsonNode) and arrays are built by mapping the nodes through it
) | ||
} | ||
is ObjectTypeWithEmptySchema -> | ||
orUnknown(json, json.isObject) { objectFromMap(emptyMap<String, FieldType>()) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we wanted to treat this case identically to ObjectTypeWithoutSchema? this looks like it's just nuking all the fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed that
|
||
@JvmInline value class IntegerValue(val value: Long) : AirbyteValue | ||
|
||
@JvmInline value class NumberValue(val value: Double) : AirbyteValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JvmInline value class NumberValue(val value: Double) : AirbyteValue | |
@JvmInline value class NumberValue(val value: BigDecimal) : AirbyteValue |
(we've seen loss of precision with float64)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
} | ||
} | ||
|
||
private fun fromAny(value: Any?): AirbyteValue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I understand this function here - this class is all about converting JsonNode -> AirbyteValue, we should never be handling raw Any? (there's a few calls where it looks like we're passing a JsonNode to fromAny, which sounds wrong)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I fixed that
|
||
@JvmInline value class DateValue(val value: String) : AirbyteValue | ||
|
||
@JvmInline value class TimestampValue(val value: String) : AirbyteValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have separate classes for time/timestamp with/without tz?
class TimeWithTimezone(OffsetTime)
class TimeWithoutTimezone(LocalTime)
class TimestampWithTimezone(OffsetDateTime)
class TimestampWithoutTimezone(LocalDateTime)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've gone back and forth on it. Only if we're converting timestamps I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤷
let's do that for now, and if we run into trouble, we can always switch this around
import com.fasterxml.jackson.databind.node.ObjectNode | ||
|
||
class JsonSchemaToAirbyteType { | ||
fun convert(schema: ObjectNode): AirbyteType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair point (I gut thought it would be hard to plug this code into an old destination, but that feels solvable) 🤷
I tend to err conservative on these behaviors, but that's probably overcautious, given that the s3 rollout went fine 🚛
a0f7167
to
bd807fe
Compare
@@ -39,7 +42,8 @@ class DestinationStreamFactory { | |||
DestinationStream.Descriptor( | |||
namespace = stream.stream.namespace, | |||
name = stream.stream.name | |||
) | |||
), | |||
schema = JsonSchemaToAirbyteType().convert(stream.stream.jsonSchema as ObjectNode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to remove this cast. I want this never to fail.
83b9123
to
3d3ee41
Compare
After rebase it needs reverse mapping to function. :( |
for the asProtocolObject() stuff? you could always just ( + disable the tests, I guess.) |
3d3ee41
to
cc29a6d
Compare
cc29a6d
to
52a7b4f
Compare
What
AirbyteType, AirbyteValue, with the converters in their own classes, and tests.
Limitations: