Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialized Message Consumer #26700

Merged
merged 10 commits into from
May 31, 2023
Merged

Conversation

cgardens
Copy link
Contributor

@cgardens cgardens commented May 27, 2023

What

  • As part of the async version of the Snowflake Destination, instead of passing the already deserialized messages to the message consumer, we need to pass the raw string (and its size in bytes).
  • This PR changes adds a new consumer to the Destination interface that does just (SerializedAirbyteMessageConsumer). It is default to just delegate to the old AirbyteMessageConsumer. This makes it backwards compatible and a no op for existing Destinations. The IntegrationRunner to calls the interface method on the Destination.
  • I have left the code (but commented out) in SnowflakeDestination.java for how we would turn on the async version of the Snowflake Destination. As we roll it out with the feature flag, those 3 lines of code will all that needs to be maintained on the feature branch.

Recommended reading order

  1. airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Destination.java
  2. airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/SerializedAirbyteMessageConsumer.java
  3. airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java
  4. the rest..

🚨 User Impact 🚨

No! That's the whole point of this PR it's a no op from the user's point of view, but sets up the async snowflake destination.

@github-actions
Copy link
Contributor

github-actions bot commented May 27, 2023

Affected Connector Report

NOTE ⚠️ Changes in this PR affect the following connectors. Make sure to do the following as needed:

  • Run integration tests
  • Bump connector or module version
  • Add changelog
  • Publish the new version

✅ Sources (33)

Connector Version Changelog Publish
source-alloydb 2.0.28
source-alloydb-strict-encrypt 2.0.28 🔵
(ignored)
🔵
(ignored)
source-azure-blob-storage 0.1.0
source-bigquery 0.2.3
source-clickhouse 0.1.17
source-clickhouse-strict-encrypt 0.1.17 🔵
(ignored)
🔵
(ignored)
source-cockroachdb 0.1.22
source-cockroachdb-strict-encrypt 0.1.22 🔵
(ignored)
🔵
(ignored)
source-db2 0.1.19
source-db2-strict-encrypt 0.1.19 🔵
(ignored)
🔵
(ignored)
source-dynamodb 0.1.2
source-e2e-test 2.1.4
source-e2e-test-cloud 2.1.4 🔵
(ignored)
🔵
(ignored)
source-elasticsearch 0.1.1
source-jdbc 0.3.5 🔵
(ignored)
🔵
(ignored)
source-kafka 0.2.3
source-mongodb-strict-encrypt 0.1.19 🔵
(ignored)
🔵
(ignored)
source-mongodb-v2 0.1.19
source-mssql 1.0.17
source-mssql-strict-encrypt 1.0.17 🔵
(ignored)
🔵
(ignored)
source-mysql 2.0.24
source-mysql-strict-encrypt 2.0.24 🔵
(ignored)
🔵
(ignored)
source-oracle 0.3.24
source-oracle-strict-encrypt 0.3.24 🔵
(ignored)
🔵
(ignored)
source-postgres 2.0.31
source-postgres-strict-encrypt 2.0.31 🔵
(ignored)
🔵
(ignored)
source-redshift 0.3.16
source-relational-db 0.3.1 🔵
(ignored)
🔵
(ignored)
source-scaffold-java-jdbc 0.1.0 🔵
(ignored)
🔵
(ignored)
source-sftp 0.1.2
source-snowflake 0.1.34
source-teradata 0.1.0
source-tidb 0.2.4
  • See "Actionable Items" below for how to resolve warnings and errors.

❌ Destinations (50)

Connector Version Changelog Publish
destination-azure-blob-storage 0.2.0
destination-bigquery 1.4.4
destination-bigquery-denormalized 1.4.1
destination-cassandra 0.1.4
destination-clickhouse 0.2.3
destination-clickhouse-strict-encrypt 0.2.3 🔵
(ignored)
🔵
(ignored)
destination-csv 1.0.0
destination-databricks 1.0.2
destination-dev-null 0.3.0 🔵
(ignored)
🔵
(ignored)
destination-doris 0.1.0
destination-dynamodb 0.1.7
destination-e2e-test 0.3.0
destination-elasticsearch 0.1.6
destination-elasticsearch-strict-encrypt 0.1.6 🔵
(ignored)
🔵
(ignored)
destination-exasol 0.1.1
destination-gcs 0.3.0
destination-iceberg 0.1.0
destination-kafka 0.1.10
destination-keen 0.2.4
destination-kinesis 0.1.5
destination-local-json 0.2.11
destination-mariadb-columnstore 0.1.7
destination-mongodb 0.1.9
destination-mongodb-strict-encrypt 0.1.9 🔵
(ignored)
🔵
(ignored)
destination-mqtt 0.1.3
destination-mssql 0.1.23
destination-mssql-strict-encrypt 0.1.23 🔵
(ignored)
🔵
(ignored)
destination-mysql 0.1.20
destination-mysql-strict-encrypt 0.1.21
(mismatch: 0.1.20)
🔵
(ignored)
🔵
(ignored)
destination-oracle 0.1.19
destination-oracle-strict-encrypt 0.1.19 🔵
(ignored)
🔵
(ignored)
destination-postgres 0.3.27
destination-postgres-strict-encrypt 0.3.27 🔵
(ignored)
🔵
(ignored)
destination-pubsub 0.2.0
destination-pulsar 0.1.3
destination-r2 0.1.0
destination-redis 0.1.4
destination-redpanda 0.1.0
destination-redshift 0.4.8
destination-rockset 0.1.4
destination-s3 0.4.1
destination-s3-glue 0.1.7
destination-scylla 0.1.3
destination-selectdb 0.1.0
destination-snowflake 1.0.4
destination-starburst-galaxy 0.0.1
destination-teradata 0.1.1
destination-tidb 0.1.1
(diff seed version)
destination-vertica 0.1.0
destination-yugabytedb 0.1.1
  • See "Actionable Items" below for how to resolve warnings and errors.

👀 Other Modules (1)

  • base-normalization

Actionable Items

(click to expand)

Category Status Actionable Item
Version
mismatch
The version of the connector is different from its normal variant. Please bump the version of the connector.

doc not found
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug.
Changelog
doc not found
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug.

changelog missing
There is no chnagelog for the current version of the connector. If you are the author of the current version, please add a changelog.
Publish
not in seed
The connector is not in the cloud or oss registry, so its publication status cannot be checked. This can be normal (e.g. some connectors are cloud-specific, and only listed in the cloud seed file). Please double-check to make sure that you have added a metadata.yaml file and the expected registries are enabled.

@cgardens cgardens force-pushed the cgardens/serialized-message-consumer branch from 3dba253 to 70d6e01 Compare May 27, 2023 05:49
@cgardens cgardens force-pushed the cgardens/serialized-message-consumer branch from cbc2852 to 4ccd5c7 Compare May 27, 2023 06:06
Comment on lines +43 to +45
// final boolean useAsyncSnowflake = config.has("loading_method")
// && config.get("loading_method").has("method")
// && config.get("loading_method").get("method").asText().equals("Internal Staging");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uncommenting these lines of code will all we will need to do to "turn on" the async version of the destination. as we do a feature flagged roll out, this will be the only diff in that feature branch.

@cgardens cgardens force-pushed the cgardens/serialized-message-consumer branch from 4ccd5c7 to ac71c2d Compare May 27, 2023 06:14
@cgardens cgardens marked this pull request as ready for review May 27, 2023 06:15
@cgardens cgardens requested review from a team as code owners May 27, 2023 06:15
* @throws Exception exception
*/
@Override
void accept(String message, Integer sizeInBytes) throws Exception;
Copy link
Contributor

@davinchia davinchia May 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: should sizeInBytes be a long instead of an int? although seems very unlikely we'd have a record > 2.3GB, not sure we want to bake this assumption into the interface. might be useful for this to be more flexible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't have a strong opinion. i think we said fivetran limits record size to 200MB, so that seems like it gives us a fair amount of buffer. happy to swtich it.

Comment on lines +77 to +78
* If the provided JSON string is invalid AND represents a {@link AirbyteMessage.Type#STATE}
* message, processing is halted. Otherwise, the invalid message is logged and execution continues.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably useful to add more details why i.e. STATE messages represent application state and we want to fail fast to ensure data correctness.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah - didn't realise this was moved from an existing class.

}

final Procedure consumeWriteStreamCallable = () -> {
try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for understanding: after this merge, we should be able to get rid of getConsumer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can only get rid of getConsumer once all destinations are migrated over to use the new thing. but for snowflake, yes, in subsequent PRs it overrides this method instead.

throws Exception {
consumer.start();

final byte[] buffer = new byte[8192]; // 8K buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any specific reason to use 8k?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this from somewhere else? looks familiar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8k bytes is the default buffer size for BufferedInputStream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to what ryan said.

Copy link
Contributor

@davinchia davinchia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

I'm not 100% the shim is in the right place. It looks like SwitchingDestination is where this should go. I don't think this is blocking.

Copy link
Contributor

@edgao edgao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the manual system.in parsing scares me. Open to being convinced that I'm being overcautious though.

other than that, agree that this is a noop. had a few comments for my own learning + one nitpick.

@edgao
Copy link
Contributor

edgao commented May 30, 2023

/test connector=connectors/destination-snowflake

🕑 connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/5125001361
❌ connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/5125001361
🐛 https://gradle.com/s/kw33vs3dw7ohe

Build Failed

Test summary info:

Could not find result summary

@edgao
Copy link
Contributor

edgao commented May 30, 2023

/test connector=connectors/destination-bigquery

🕑 connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/5125003045
❌ connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/5125003045
🐛 https://gradle.com/s/7ud3wmlloai7i

Build Failed

Test summary info:

Could not find result summary

…integrations/base/Destination.java

Co-authored-by: Edward Gao <edward.gao@airbyte.io>
…integrations/base/Destination.java

Co-authored-by: Davin Chia <davinchia@gmail.com>
}
};

watchForOrphanThreads(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for context, we moved this because on master it is getting called before the close is called on the destination. this doesn't make sense, because it is checking if the destination cleaned itself up properly. that's a ridiculous thing to change before you let the destination call close. now this only gets called after the destination calls close

…integrations/base/IntegrationRunner.java

Co-authored-by: Davin Chia <davinchia@gmail.com>
Copy link
Contributor

@evantahler evantahler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of my comments are blocking, but the /test seems to be failing:

BigQueryStandardDestinationAcceptanceTest > testCustomDbtTransformations() FAILED
    io.airbyte.workers.exception.TestHarnessException: dbt debug Failed.
        at app//io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest.testCustomDbtTransformations(DestinationAcceptanceTest.java:967)

So something isn't working quite right

Comment on lines +115 to +119
if (isStateMessage(inputString)) {
throw new IllegalStateException("Invalid state message: " + inputString);
} else {
log.error("Received invalid message: " + inputString);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I believe that all other types of messages that a source might emit (log, control, etc) should not be passed to the destination, so STATE messages are the only other types to check for which are meaningful

*/
@SuppressWarnings("OptionalIsPresent")
private static boolean isStateMessage(final String input) {
final Optional<AirbyteTypeMessage> deserialized = Jsons.tryDeserialize(input, AirbyteTypeMessage.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure you tested this, but just for my own understanding, how does this work? Since there's only one top-level property, with the JSON parser only deser the top level, leaving RECORD: {} as a string?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @davinchia remember when we tried this in mexico with just string.contains()? this is much better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah exactly. it just deserializes type and ignores the rest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw, jonathan pearlin is the one who set this up. this has been in prod for a while.

while ((bytesRead = bis.read(buffer)) != -1) {
for (int i = 0; i < bytesRead; i++) {
final byte b = buffer[i];
if (b == '\n' || b == '\r') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Any worries about "encoded" newline characters within the records themselves ({body: "hello\nworld"})? Is there a builtin for sytem.newline or something like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a test in IntegrationRunnerBackwardsCompatbilityTest.java to make sure the behavior is the same between what we have on master and this branch to make sure we don't mess anything up.

@cgardens
Copy link
Contributor Author

cgardens commented May 30, 2023

/test connector=connectors/destination-redshift

🕑 connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/5127201455
❌ connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/5127201455
🐛 https://gradle.com/s/lgdgr267tryow

Build Failed

Test summary info:

Could not find result summary

@cgardens
Copy link
Contributor Author

cgardens commented May 30, 2023

/test connector=connectors/destination-snowflake

🕑 connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/5127223755
❌ connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/5127223755
🐛 https://gradle.com/s/ubyqihx2ubklq

Build Failed

Test summary info:

Could not find result summary

@cgardens
Copy link
Contributor Author

cgardens commented May 30, 2023

/test connector=connectors/destination-bigquery

🕑 connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/5127225203
❌ connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/5127225203
🐛 https://gradle.com/s/5idbzkksfbg6k

Build Failed

Test summary info:

Could not find result summary

@cgardens cgardens mentioned this pull request May 30, 2023
@cgardens
Copy link
Contributor Author

running tests on "master" here to compare test results

@cgardens
Copy link
Contributor Author

cgardens commented May 31, 2023

/test connector=connectors/destination-redshift

🕑 connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/5128577294
❌ connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/5128577294
🐛 https://gradle.com/s/ghiub4y2uymnm

Build Failed

Test summary info:

Could not find result summary

@cgardens
Copy link
Contributor Author

cgardens commented May 31, 2023

/test connector=connectors/destination-snowflake

🕑 connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/5128577562
❌ connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/5128577562
🐛 https://gradle.com/s/swzomntwcby7s

Build Failed

Test summary info:

Could not find result summary

@cgardens
Copy link
Contributor Author

cgardens commented May 31, 2023

/test connector=connectors/destination-bigquery

🕑 connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/5128578437
❌ connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/5128578437
🐛 https://gradle.com/s/w7czo6hmeaglk

Build Failed

Test summary info:

Could not find result summary

@cgardens
Copy link
Contributor Author

cgardens commented May 31, 2023

/test connector=connectors/destination-redshift

🕑 connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/5129466717
❌ connectors/destination-redshift https://github.com/airbytehq/airbyte/actions/runs/5129466717
🐛 https://gradle.com/s/gj6zwtnlrmdau

Build Failed

Test summary info:

Could not find result summary

@cgardens
Copy link
Contributor Author

cgardens commented May 31, 2023

/test connector=connectors/destination-snowflake

🕑 connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/5129467836
❌ connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/5129467836
🐛 https://gradle.com/s/7hnpp4if4ynyk

Build Failed

Test summary info:

Could not find result summary

@cgardens
Copy link
Contributor Author

cgardens commented May 31, 2023

/test connector=connectors/destination-bigquery

🕑 connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/5129468682
❌ connectors/destination-bigquery https://github.com/airbytehq/airbyte/actions/runs/5129468682
🐛 https://gradle.com/s/7gwdy6c7eachu

Build Failed

Test summary info:

Could not find result summary

@cgardens cgardens merged commit 567f839 into master May 31, 2023
@cgardens cgardens deleted the cgardens/serialized-message-consumer branch May 31, 2023 15:14
marcosmarxm pushed a commit to natalia-miinto/airbyte that referenced this pull request Jun 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants