Add Coordinator Layer and Java Coordinator#65958
Open
jason810496 wants to merge 61 commits into
Open
Conversation
This was referenced Apr 27, 2026
Draft
uranusjr
reviewed
Apr 28, 2026
688d569 to
59d5a47
Compare
jscheffl
reviewed
Apr 28, 2026
jscheffl
reviewed
Apr 28, 2026
jscheffl
reviewed
Apr 28, 2026
jscheffl
reviewed
Apr 28, 2026
jscheffl
reviewed
Apr 28, 2026
d2f28c8 to
52dcb2a
Compare
uranusjr
reviewed
May 2, 2026
* CI: Fix uv.lock by removing coordinator distribution * Remove the remaining entry catch by codex bot
f8a67d2 to
6f10fc7
Compare
1 task
A JavaCoordinator can now accept more than one path to look for JARs. This path is also used to populate --class-path when executing the task, so you can now split dependencies and the task-containing JAR into different locations. This should make deployment a bit easier. The already-unused BundleScanner class has been removed. This was for Java-based DAGs, and not used in the current specification.
This was referenced May 19, 2026
16332b2 to
e4b5125
Compare
* Drop selector_loop module; new JavaCoordinator does not need it The rewritten ``JavaCoordinator`` (``airflow.sdk.coordinators.java``) lets the JVM connect directly to two listening sockets (``comm`` and ``logs``) and uses the accepted sockets as the supervisor's ``stdin`` / log pipes straight up. There is no bytes-bridge between two sockets, so ``make_raw_forwarder`` -- the only helper the extracted ``selector_loop`` module added beyond what was already inline in ``supervisor.py`` -- has no caller. ``_JavaActivitySubprocess`` reuses ``_register_pipe_readers`` and ``_close_unused_sockets`` via subclassing ``ActivitySubprocess``; both methods existed before the extraction and remain in ``supervisor.py`` after this revert. The inline selector dispatch loop and ``make_buffered_socket_reader`` come back into ``supervisor.py`` so the existing call sites (including the ``triggerer_job_runner`` re-export) keep working unchanged. This reverts commit 56464a8 ("Add common selector loop utilities for socket I/O handling for subprocesses") and deletes ``test_selector_loop.py`` introduced by 4b80753. * Address copilot's comments
With foreign language SDKs, it may be possible the two sides of supervisor comm have different versions. This adds a migration layer at the supervisor (server) side, so an SDK (client) using a lower version of the schema may be able to communicate to the server.
Member
|
I went through the AIP and it seems like this is up-to-date now. It still needs to wait until the AIP passes, but I think we can do reviews in the mean time. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Add Coordinator Layer and Java Coordinator
Why
Airflow's DAG file processor and task runner only understand Python. To run DAGs and tasks authored in other languages (Java now, Go/Rust later), both the parsing pipeline and the execution pipeline need a language-agnostic extension point that delegates to an external runtime subprocess.
How
The Coordinator Abstraction
A new
BaseCoordinatorbase class in the Task SDK (task-sdk/src/airflow/sdk/execution_time/coordinator.py) defines the extension point. Language providers subclass it and implement three methods:can_handle_dag_file(bundle_name, path)dag_parsing_runtime_cmd(...)task_execution_runtime_cmd(...)The base class owns the full subprocess lifecycle: TCP server creation, subprocess spawning, connection acceptance, and a selector-based byte-forwarding bridge between the Airflow supervisor (fd 0) and the language runtime (TCP socket). The shared I/O loop is extracted into
selector_loop.pyand reused byWatchedSubprocess.Discovery and Routing
Providers register coordinators in
provider.yamlunder a newcoordinatorskey.ProvidersManager(airflow-core) andProvidersManagerTaskRuntime(task-sdk) both discover them:DagFileProcessorProcess._resolve_processor_target()iterates registered coordinators — the first whosecan_handle_dag_file()returnsTruehandles the file.task_runner._resolve_runtime_entrypoint()uses a two-step resolution: first it consults the[sdk] queue_to_sdkmapping (queue name to coordinator runtime name), then it falls back to matching DAG file extensions against registered coordinators.Queue-Based Runtime Routing
Tasks are routed to non-Python runtimes via their
queueassignment and a configuration mapping. Operators setqueue="java-queue"(or any custom queue name), and the[sdk] queue_to_sdkconfig maps queue names to coordinator runtime names:This avoids adding new columns or API fields -- the existing
queuefield carries the routing signal from scheduling to execution, and the mapping is resolved at task execution time.Java Provider
A new
apache-airflow-providers-sdk-javaprovider implementsJavaCoordinator:can_handle_dag_file: checks if the file is a JAR with valid Airflow Java SDK manifest attributesdag_parsing_runtime_cmd: constructsjava -classpath <bundle>/* <MainClass> --comm=... --logs=...task_execution_runtime_cmd: handles both pure Java DAGs (JAR path) and Python stub DAGs (resolves bundle from[java] bundles_folderconfig)get_code_from_file: extracts embedded.javasource from the JAR for Airflow UI displayWhat
Task SDK (
task-sdk/)BaseCoordinatorabstract base class with full subprocess bridge lifecycleselector_loop.py— shared selector-based I/O utilities, refactored out ofsupervisor.py_resolve_runtime_entrypoint()totask_runner.pywith queue-based and file-extension-based dispatchQueueToCoordinatorMapperfor resolving queue names to coordinators via[sdk] queue_to_sdkconfigresolve_bundle()helper for reuse by both Python and coordinator pathscoordinatorsdiscovery inProvidersManagerTaskRuntimeAirflow Core (
airflow-core/)[sdk] queue_to_sdkconfiguration option for queue-to-runtime mappingDagFileProcessorProcess.start()with_resolve_processor_target()for coordinator delegationDagFileProcessorManagerto recognize runtime file extensions (e.g.,.jar) and skip ZIP inspection for themDagCode.get_code_from_file()to delegate to coordinator'sget_code_from_file()coordinatorsextension point toprovider.yaml.schema.jsonandprovider_info.schema.jsoncoordinatorsdiscovery inProvidersManagerJava Provider (
providers/sdk/java/)JavaCoordinatorwith DAG parsing, task execution, and code extractionBundleScannerfor JAR manifest inspection and bundle resolutionprovider.yamlwithcoordinatorsregistration and[java] bundles_folderconfigpyproject.toml, docs, LICENSE, NOTICE)java_sdk_setup.shfor Breeze development environmentWas generative AI tooling used to co-author this PR?
Co-authored-by: Tzu-ping Chung uranusjr@gmail.com