-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Serialized Message Consumer #26700
Serialized Message Consumer #26700
Conversation
Affected Connector ReportNOTE
|
Connector | Version | Changelog | Publish |
---|---|---|---|
source-alloydb |
2.0.28 |
✅ | ✅ |
source-alloydb-strict-encrypt |
2.0.28 |
🔵 (ignored) |
🔵 (ignored) |
source-azure-blob-storage |
0.1.0 |
✅ | ✅ |
source-bigquery |
0.2.3 |
✅ | ✅ |
source-clickhouse |
0.1.17 |
✅ | ✅ |
source-clickhouse-strict-encrypt |
0.1.17 |
🔵 (ignored) |
🔵 (ignored) |
source-cockroachdb |
0.1.22 |
✅ | ✅ |
source-cockroachdb-strict-encrypt |
0.1.22 |
🔵 (ignored) |
🔵 (ignored) |
source-db2 |
0.1.19 |
✅ | ✅ |
source-db2-strict-encrypt |
0.1.19 |
🔵 (ignored) |
🔵 (ignored) |
source-dynamodb |
0.1.2 |
✅ | ✅ |
source-e2e-test |
2.1.4 |
✅ | ✅ |
source-e2e-test-cloud |
2.1.4 |
🔵 (ignored) |
🔵 (ignored) |
source-elasticsearch |
0.1.1 |
✅ | ✅ |
source-jdbc |
0.3.5 |
🔵 (ignored) |
🔵 (ignored) |
source-kafka |
0.2.3 |
✅ | ✅ |
source-mongodb-strict-encrypt |
0.1.19 |
🔵 (ignored) |
🔵 (ignored) |
source-mongodb-v2 |
0.1.19 |
✅ | ✅ |
source-mssql |
1.0.17 |
✅ | ✅ |
source-mssql-strict-encrypt |
1.0.17 |
🔵 (ignored) |
🔵 (ignored) |
source-mysql |
2.0.24 |
✅ | ✅ |
source-mysql-strict-encrypt |
2.0.24 |
🔵 (ignored) |
🔵 (ignored) |
source-oracle |
0.3.24 |
✅ | ✅ |
source-oracle-strict-encrypt |
0.3.24 |
🔵 (ignored) |
🔵 (ignored) |
source-postgres |
2.0.31 |
✅ | ✅ |
source-postgres-strict-encrypt |
2.0.31 |
🔵 (ignored) |
🔵 (ignored) |
source-redshift |
0.3.16 |
✅ | ✅ |
source-relational-db |
0.3.1 |
🔵 (ignored) |
🔵 (ignored) |
source-scaffold-java-jdbc |
0.1.0 |
🔵 (ignored) |
🔵 (ignored) |
source-sftp |
0.1.2 |
✅ | ✅ |
source-snowflake |
0.1.34 |
✅ | ✅ |
source-teradata |
0.1.0 |
✅ | ✅ |
source-tidb |
0.2.4 |
✅ | ✅ |
- See "Actionable Items" below for how to resolve warnings and errors.
❌ Destinations (50)
Connector | Version | Changelog | Publish |
---|---|---|---|
destination-azure-blob-storage |
0.2.0 |
✅ | ✅ |
destination-bigquery |
1.4.4 |
✅ | ✅ |
destination-bigquery-denormalized |
1.4.1 |
✅ | ✅ |
destination-cassandra |
0.1.4 |
✅ | ✅ |
destination-clickhouse |
0.2.3 |
✅ | ✅ |
destination-clickhouse-strict-encrypt |
0.2.3 |
🔵 (ignored) |
🔵 (ignored) |
destination-csv |
1.0.0 |
✅ | ✅ |
destination-databricks |
1.0.2 |
✅ | ✅ |
destination-dev-null |
0.3.0 |
🔵 (ignored) |
🔵 (ignored) |
destination-doris |
0.1.0 |
✅ | ✅ |
destination-dynamodb |
0.1.7 |
✅ | ✅ |
destination-e2e-test |
0.3.0 |
✅ | ✅ |
destination-elasticsearch |
0.1.6 |
✅ | ✅ |
destination-elasticsearch-strict-encrypt |
0.1.6 |
🔵 (ignored) |
🔵 (ignored) |
destination-exasol |
0.1.1 |
✅ | ✅ |
destination-gcs |
0.3.0 |
✅ | ✅ |
destination-iceberg |
0.1.0 |
✅ | ✅ |
destination-kafka |
0.1.10 |
✅ | ✅ |
destination-keen |
0.2.4 |
✅ | ✅ |
destination-kinesis |
0.1.5 |
✅ | ✅ |
destination-local-json |
0.2.11 |
✅ | ✅ |
destination-mariadb-columnstore |
0.1.7 |
✅ | ✅ |
destination-mongodb |
0.1.9 |
✅ | ✅ |
destination-mongodb-strict-encrypt |
0.1.9 |
🔵 (ignored) |
🔵 (ignored) |
destination-mqtt |
0.1.3 |
✅ | ✅ |
destination-mssql |
0.1.23 |
✅ | ✅ |
destination-mssql-strict-encrypt |
0.1.23 |
🔵 (ignored) |
🔵 (ignored) |
destination-mysql |
0.1.20 |
✅ | ✅ |
destination-mysql-strict-encrypt |
❌ 0.1.21 (mismatch: 0.1.20 ) |
🔵 (ignored) |
🔵 (ignored) |
destination-oracle |
0.1.19 |
✅ | ✅ |
destination-oracle-strict-encrypt |
0.1.19 |
🔵 (ignored) |
🔵 (ignored) |
destination-postgres |
0.3.27 |
✅ | ✅ |
destination-postgres-strict-encrypt |
0.3.27 |
🔵 (ignored) |
🔵 (ignored) |
destination-pubsub |
0.2.0 |
✅ | ✅ |
destination-pulsar |
0.1.3 |
✅ | ✅ |
destination-r2 |
0.1.0 |
✅ | ✅ |
destination-redis |
0.1.4 |
✅ | ✅ |
destination-redpanda |
0.1.0 |
✅ | ✅ |
destination-redshift |
0.4.8 |
✅ | ✅ |
destination-rockset |
0.1.4 |
✅ | ✅ |
destination-s3 |
0.4.1 |
✅ | ✅ |
destination-s3-glue |
0.1.7 |
✅ | ✅ |
destination-scylla |
0.1.3 |
✅ | ✅ |
destination-selectdb |
0.1.0 |
✅ | ✅ |
destination-snowflake |
1.0.4 |
✅ | ✅ |
destination-starburst-galaxy |
0.0.1 |
✅ | ✅ |
destination-teradata |
0.1.1 |
✅ | ✅ |
destination-tidb |
0.1.1 |
✅ | ❌ (diff seed version) |
destination-vertica |
0.1.0 |
✅ | ✅ |
destination-yugabytedb |
0.1.1 |
✅ | ✅ |
- See "Actionable Items" below for how to resolve warnings and errors.
👀 Other Modules (1)
- base-normalization
Actionable Items
(click to expand)
Category | Status | Actionable Item |
---|---|---|
Version | ❌ mismatch |
The version of the connector is different from its normal variant. Please bump the version of the connector. |
⚠ doc not found |
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug. |
|
Changelog | ⚠ doc not found |
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug. |
❌ changelog missing |
There is no chnagelog for the current version of the connector. If you are the author of the current version, please add a changelog. | |
Publish | ⚠ not in seed |
The connector is not in the cloud or oss registry, so its publication status cannot be checked. This can be normal (e.g. some connectors are cloud-specific, and only listed in the cloud seed file). Please double-check to make sure that you have added a metadata.yaml file and the expected registries are enabled. |
3dba253
to
70d6e01
Compare
cbc2852
to
4ccd5c7
Compare
// final boolean useAsyncSnowflake = config.has("loading_method") | ||
// && config.get("loading_method").has("method") | ||
// && config.get("loading_method").get("method").asText().equals("Internal Staging"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uncommenting these lines of code will all we will need to do to "turn on" the async version of the destination. as we do a feature flagged roll out, this will be the only diff in that feature branch.
4ccd5c7
to
ac71c2d
Compare
* @throws Exception exception | ||
*/ | ||
@Override | ||
void accept(String message, Integer sizeInBytes) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: should sizeInBytes
be a long instead of an int? although seems very unlikely we'd have a record > 2.3GB, not sure we want to bake this assumption into the interface. might be useful for this to be more flexible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't have a strong opinion. i think we said fivetran limits record size to 200MB, so that seems like it gives us a fair amount of buffer. happy to swtich it.
* If the provided JSON string is invalid AND represents a {@link AirbyteMessage.Type#STATE} | ||
* message, processing is halted. Otherwise, the invalid message is logged and execution continues. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: probably useful to add more details why i.e. STATE messages represent application state and we want to fail fast to ensure data correctness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah - didn't realise this was moved from an existing class.
...yte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Destination.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
final Procedure consumeWriteStreamCallable = () -> { | ||
try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for understanding: after this merge, we should be able to get rid of getConsumer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can only get rid of getConsumer
once all destinations are migrated over to use the new thing. but for snowflake, yes, in subsequent PRs it overrides this method instead.
throws Exception { | ||
consumer.start(); | ||
|
||
final byte[] buffer = new byte[8192]; // 8K buffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any specific reason to use 8k?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this from somewhere else? looks familiar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8k bytes is the default buffer size for BufferedInputStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to what ryan said.
...tegrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% the shim is in the right place. It looks like SwitchingDestination
is where this should go. I don't think this is blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the manual system.in parsing scares me. Open to being convinced that I'm being overcautious though.
other than that, agree that this is a noop. had a few comments for my own learning + one nitpick.
...yte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Destination.java
Show resolved
Hide resolved
...yte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Destination.java
Show resolved
Hide resolved
...yte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Destination.java
Show resolved
Hide resolved
...tegrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java
Show resolved
Hide resolved
...tegrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java
Show resolved
Hide resolved
/test connector=connectors/destination-snowflake
Build FailedTest summary info:
|
/test connector=connectors/destination-bigquery
Build FailedTest summary info:
|
…integrations/base/Destination.java Co-authored-by: Edward Gao <edward.gao@airbyte.io>
…integrations/base/Destination.java Co-authored-by: Davin Chia <davinchia@gmail.com>
} | ||
}; | ||
|
||
watchForOrphanThreads( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for context, we moved this because on master it is getting called before the close
is called on the destination. this doesn't make sense, because it is checking if the destination cleaned itself up properly. that's a ridiculous thing to change before you let the destination call close
. now this only gets called after the destination calls close
…integrations/base/IntegrationRunner.java Co-authored-by: Davin Chia <davinchia@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None of my comments are blocking, but the /test
seems to be failing:
BigQueryStandardDestinationAcceptanceTest > testCustomDbtTransformations() FAILED
io.airbyte.workers.exception.TestHarnessException: dbt debug Failed.
at app//io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest.testCustomDbtTransformations(DestinationAcceptanceTest.java:967)
So something isn't working quite right
if (isStateMessage(inputString)) { | ||
throw new IllegalStateException("Invalid state message: " + inputString); | ||
} else { | ||
log.error("Received invalid message: " + inputString); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I believe that all other types of messages that a source might emit (log, control, etc) should not be passed to the destination, so STATE messages are the only other types to check for which are meaningful
*/ | ||
@SuppressWarnings("OptionalIsPresent") | ||
private static boolean isStateMessage(final String input) { | ||
final Optional<AirbyteTypeMessage> deserialized = Jsons.tryDeserialize(input, AirbyteTypeMessage.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sure you tested this, but just for my own understanding, how does this work? Since there's only one top-level property, with the JSON parser only deser the top level, leaving RECORD: {}
as a string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @davinchia remember when we tried this in mexico with just string.contains()
? this is much better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah exactly. it just deserializes type and ignores the rest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fwiw, jonathan pearlin is the one who set this up. this has been in prod for a while.
while ((bytesRead = bis.read(buffer)) != -1) { | ||
for (int i = 0; i < bytesRead; i++) { | ||
final byte b = buffer[i]; | ||
if (b == '\n' || b == '\r') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Any worries about "encoded" newline characters within the records themselves ({body: "hello\nworld"}
)? Is there a builtin for sytem.newline
or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a test in IntegrationRunnerBackwardsCompatbilityTest.java
to make sure the behavior is the same between what we have on master and this branch to make sure we don't mess anything up.
/test connector=connectors/destination-redshift
Build FailedTest summary info:
|
/test connector=connectors/destination-snowflake
Build FailedTest summary info:
|
/test connector=connectors/destination-bigquery
Build FailedTest summary info:
|
running tests on "master" here to compare test results |
This reverts commit 75240b0.
/test connector=connectors/destination-redshift
Build FailedTest summary info:
|
/test connector=connectors/destination-snowflake
Build FailedTest summary info:
|
/test connector=connectors/destination-bigquery
Build FailedTest summary info:
|
…"" This reverts commit a89d5bc.
/test connector=connectors/destination-redshift
Build FailedTest summary info:
|
/test connector=connectors/destination-snowflake
Build FailedTest summary info:
|
/test connector=connectors/destination-bigquery
Build FailedTest summary info:
|
This reverts commit 81f44d9.
What
Destination
interface that does just (SerializedAirbyteMessageConsumer
). It is default to just delegate to the oldAirbyteMessageConsumer
. This makes it backwards compatible and a no op for existing Destinations. TheIntegrationRunner
to calls the interface method on the Destination.SnowflakeDestination.java
for how we would turn on the async version of the Snowflake Destination. As we roll it out with the feature flag, those 3 lines of code will all that needs to be maintained on the feature branch.Recommended reading order
airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Destination.java
airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SerializedAirbyteMessageConsumer.java
airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java
🚨 User Impact 🚨
No! That's the whole point of this PR it's a no op from the user's point of view, but sets up the async snowflake destination.