[BEAM-8471] Flink native job submission for portable pipelines#9872
[BEAM-8471] Flink native job submission for portable pipelines#9872tweise merged 1 commit intoapache:masterfrom
Conversation
|
R: @chadrik |
|
Thanks for the heads up. I don't have any input on this, but I do like staying in the loop on this subject! |
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
Outdated
Show resolved
Hide resolved
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientRunner.java
Outdated
Show resolved
Hide resolved
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientRunner.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
We may want to make this executable configurable, for non bash users. How about putting executable and args in the config with the current defaults?
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientRunner.java
Outdated
Show resolved
Hide resolved
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientRunner.java
Outdated
Show resolved
Hide resolved
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientRunner.java
Outdated
Show resolved
Hide resolved
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientRunner.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
| this.driverCmd = driverCmd; | |
| Preconditions.checkState(!driverCmd.contains(DRIVER_CMD_FLAGS), "Driver command must not contain "+ DRIVER_CMD_FLAGS); | |
| this.driverCmd = driverCmd; |
There was a problem hiding this comment.
| private static final String DRIVER_CMD_FLAGS = "--job_endpoint=%s"; | |
| private static final String JOB_ENDPOINT_FLAG = "--job_endpoint"; |
There was a problem hiding this comment.
This allows us to check for the args in the user command string. The %s would have to be appended in the format string.
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
@mxm any preference regarding class name? I first had it as FlinkPortableClientRunner but maybe "runner" is misleading. On the other hand, "entry point" is also redundant.
There was a problem hiding this comment.
How about just FlinkPortableRunner? The functionality is closest to the classic FlinkRunner, as it allows for any portable pipeline to be submitted.
There was a problem hiding this comment.
I had it as XYZRunner originally, but figured it will cause confusion. There are FlinkPipelineRunner and PortablePipelineRunner already (these are used within the job server).
There was a problem hiding this comment.
How about just FlinkPortableRunner? The functionality is closest to the classic FlinkRunner, as it allows for any portable pipeline to be submitted.
There was a problem hiding this comment.
@mxm I see the inherit working as the expected output is printed in the console. But short of starting a separate java process I don't see a way to capture it in the test.
There was a problem hiding this comment.
I left the test but skipped the assertion, maybe we can still find a clever way to do this.
3b5dce8 to
f89c1b9
Compare
…program. (#27) Upstream PR: apache#9872
Add a new Flink entry point (main method) that invokes the external SDK client entry point to generate the pipeline and submits the resulting Flink job like any other Flink native driver program would, via the optimizer plan environment ("[auto]").
Note that in this PR, the SDK client is assumed to be on the same host, which is the case when Flink and Python dependencies are in the same container image, for example. While this is something that can be solved at build time, the question from Robert made me realize that the dependency is almost identical to that between the runner and SDK on the execution side, which is abstracted via the environment concept. So we could, in the future, consider introducing a "client environment" or simply expand the existing environment. This would allow the SDK bits for both pipeline construction and execution to live in a side car container, separate from Flink.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.