-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bulk-cdk: improve AirbyteConnectorRunner and CliRunner #45374
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/InputConsumer.kt
Outdated
Show resolved
Hide resolved
4cc7ac9
to
dd954be
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
had a question in CliRunner about how to use this interface, plus a few nits
airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunner.kt
Show resolved
Hide resolved
airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunner.kt
Outdated
Show resolved
Hide resolved
airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/command/CliRunner.kt
Outdated
Show resolved
Hide resolved
dd954be
to
36b042b
Compare
This is ready for another look. I added the missing functionality for async running. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checking my understanding: there's two ways to use this asyncly:
val dest = CliRunner.destination(pipeInput)
launch { dest.run() }
sendMessages(pipe)
inspect(dest.results.stuff())
or via callback
CliRunner.destination(byteArrayInput)
.withCallback({ msg -> ... })
.run()
lgtm assuming I'm on the right track!
Yes indeed! The callback is required if you want to interrupt the sync sooner than normal operation. Something which I didn't explore (because I felt it was out of scope for this PR) but which would make sense is to use channels instead of a |
What
The
CliRunner
dumps all output into a temp file and re-reads it into aBufferingOutputConsumer
. This works, but it requires an edge case in the CLI grammar defined inAirbyteConnectorRunner
. @edgao found a much better way in #45113 involvingRuntimeBeanDefinition
s. This PR implements this change and removes the--output
CLI argument support. It also proves a nice entry point for destination input messages in theCliRunner
.How
The
CliRunner
overrides thePrintStream
that gets injected in theStdoutOutputConsumer
. By default it's aPrintStream
which wrapsSystem.out
but for integration tests we want theStdoutOutputConsumer
to feed into aBufferingOutputConsumer
.Same goes for
InputStream
for destinations.Review guide
n/a
User Impact
None, test-only change.
Can this PR be safely reverted and rolled back?