Skip to content

Add Coordinator Layer and Java Coordinator#65958

Open
jason810496 wants to merge 61 commits into
apache:mainfrom
astronomer:task-sdk/feature/coordinator-interface
Open

Add Coordinator Layer and Java Coordinator#65958
jason810496 wants to merge 61 commits into
apache:mainfrom
astronomer:task-sdk/feature/coordinator-interface

Conversation

@jason810496
Copy link
Copy Markdown
Member

@jason810496 jason810496 commented Apr 27, 2026

Add Coordinator Layer and Java Coordinator

  1. Add Java SDK #65956
  2. Add Coordinator Layer and Java Coordinator #65958 (this PR)
  3. Add CI, E2E Tests, and Pre-commit Hooks for Java SDK #65959
  • Try it out: A combined PoC branch with all changes cherry-picked is available at [DON'T MERGE] Java SDK All #65960 for reviewers who want to test the full integration end-to-end.

Why

Airflow's DAG file processor and task runner only understand Python. To run DAGs and tasks authored in other languages (Java now, Go/Rust later), both the parsing pipeline and the execution pipeline need a language-agnostic extension point that delegates to an external runtime subprocess.

How

The Coordinator Abstraction

A new BaseCoordinator base class in the Task SDK (task-sdk/src/airflow/sdk/execution_time/coordinator.py) defines the extension point. Language providers subclass it and implement three methods:

Method Purpose
can_handle_dag_file(bundle_name, path) File Discovery (e.g., "is this a valid JAR that we can parse?")
dag_parsing_runtime_cmd(...) Returns the subprocess command for DAG parsing
task_execution_runtime_cmd(...) Returns the subprocess command for task execution

The base class owns the full subprocess lifecycle: TCP server creation, subprocess spawning, connection acceptance, and a selector-based byte-forwarding bridge between the Airflow supervisor (fd 0) and the language runtime (TCP socket). The shared I/O loop is extracted into selector_loop.py and reused by WatchedSubprocess.

Discovery and Routing

Providers register coordinators in provider.yaml under a new coordinators key. ProvidersManager (airflow-core) and ProvidersManagerTaskRuntime (task-sdk) both discover them:

  • DAG Parsing: DagFileProcessorProcess._resolve_processor_target() iterates registered coordinators — the first whose can_handle_dag_file() returns True handles the file.
  • Task Execution: task_runner._resolve_runtime_entrypoint() uses a two-step resolution: first it consults the [sdk] queue_to_sdk mapping (queue name to coordinator runtime name), then it falls back to matching DAG file extensions against registered coordinators.

Queue-Based Runtime Routing

Tasks are routed to non-Python runtimes via their queue assignment and a configuration mapping. Operators set queue="java-queue" (or any custom queue name), and the [sdk] queue_to_sdk config maps queue names to coordinator runtime names:

[sdk]
queue_to_sdk = {"java-queue": "java"}

This avoids adding new columns or API fields -- the existing queue field carries the routing signal from scheduling to execution, and the mapping is resolved at task execution time.

Java Provider

A new apache-airflow-providers-sdk-java provider implements JavaCoordinator:

  • can_handle_dag_file: checks if the file is a JAR with valid Airflow Java SDK manifest attributes
  • dag_parsing_runtime_cmd: constructs java -classpath <bundle>/* <MainClass> --comm=... --logs=...
  • task_execution_runtime_cmd: handles both pure Java DAGs (JAR path) and Python stub DAGs (resolves bundle from [java] bundles_folder config)
  • get_code_from_file: extracts embedded .java source from the JAR for Airflow UI display

What

Task SDK (task-sdk/)

  • Add BaseCoordinator abstract base class with full subprocess bridge lifecycle
  • Add selector_loop.py — shared selector-based I/O utilities, refactored out of supervisor.py
  • Add _resolve_runtime_entrypoint() to task_runner.py with queue-based and file-extension-based dispatch
  • Add QueueToCoordinatorMapper for resolving queue names to coordinators via [sdk] queue_to_sdk config
  • Extract resolve_bundle() helper for reuse by both Python and coordinator paths
  • Register coordinators discovery in ProvidersManagerTaskRuntime

Airflow Core (airflow-core/)

  • Add [sdk] queue_to_sdk configuration option for queue-to-runtime mapping
  • Extend DagFileProcessorProcess.start() with _resolve_processor_target() for coordinator delegation
  • Extend DagFileProcessorManager to recognize runtime file extensions (e.g., .jar) and skip ZIP inspection for them
  • Extend DagCode.get_code_from_file() to delegate to coordinator's get_code_from_file()
  • Add coordinators extension point to provider.yaml.schema.json and provider_info.schema.json
  • Register coordinators discovery in ProvidersManager

Java Provider (providers/sdk/java/)

  • Add JavaCoordinator with DAG parsing, task execution, and code extraction
  • Add BundleScanner for JAR manifest inspection and bundle resolution
  • Add provider.yaml with coordinators registration and [java] bundles_folder config
  • Add provider packaging (pyproject.toml, docs, LICENSE, NOTICE)
  • Add java_sdk_setup.sh for Breeze development environment

Was generative AI tooling used to co-author this PR?

Co-authored-by: Tzu-ping Chung uranusjr@gmail.com

@jason810496 jason810496 removed the backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch label Apr 27, 2026
@jason810496 jason810496 self-assigned this Apr 27, 2026
@uranusjr uranusjr added AIP-108: java-sdk Change this to an 'area:' label after AIP acceptance. AIP-108: Coordinator Change this to an 'area:' label after AIP acceptance. labels Apr 28, 2026
Comment thread task-sdk/src/airflow/sdk/definitions/mappedoperator.py Outdated
@jason810496 jason810496 force-pushed the task-sdk/feature/coordinator-interface branch from 688d569 to 59d5a47 Compare April 28, 2026 11:02
Comment thread airflow-core/docs/extra-packages-ref.rst Outdated
Comment thread airflow-core/src/airflow/config_templates/config.yml Outdated
Comment thread airflow-core/src/airflow/dag_processing/manager.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/workloads/task.py
Comment thread task-sdk/src/airflow/sdk/execution_time/coordinator.py Outdated
@jason810496 jason810496 force-pushed the task-sdk/feature/coordinator-interface branch 6 times, most recently from d2f28c8 to 52dcb2a Compare April 30, 2026 14:33
Copy link
Copy Markdown
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for many other tests

Comment thread task-sdk/tests/task_sdk/execution_time/test_task_runner.py
* CI: Fix uv.lock by removing coordinator distribution

* Remove the remaining entry catch by codex bot
Comment thread task-sdk/src/airflow/sdk/coordinators/java/coordinator.py Fixed
uranusjr added 2 commits May 19, 2026 18:32
A JavaCoordinator can now accept more than one path to look for JARs.
This path is also used to populate --class-path when executing the task,
so you can now split dependencies and the task-containing JAR into
different locations. This should make deployment a bit easier.

The already-unused BundleScanner class has been removed. This was for
Java-based DAGs, and not used in the current specification.
jason810496 and others added 3 commits May 20, 2026 14:40
* Drop selector_loop module; new JavaCoordinator does not need it

The rewritten ``JavaCoordinator`` (``airflow.sdk.coordinators.java``) lets
the JVM connect directly to two listening sockets (``comm`` and ``logs``)
and uses the accepted sockets as the supervisor's ``stdin`` / log pipes
straight up. There is no bytes-bridge between two sockets, so
``make_raw_forwarder`` -- the only helper the extracted ``selector_loop``
module added beyond what was already inline in ``supervisor.py`` -- has
no caller.

``_JavaActivitySubprocess`` reuses ``_register_pipe_readers`` and
``_close_unused_sockets`` via subclassing ``ActivitySubprocess``; both
methods existed before the extraction and remain in ``supervisor.py``
after this revert. The inline selector dispatch loop and
``make_buffered_socket_reader`` come back into ``supervisor.py`` so the
existing call sites (including the ``triggerer_job_runner`` re-export)
keep working unchanged.

This reverts commit 56464a8 ("Add common
selector loop utilities for socket I/O handling for subprocesses") and
deletes ``test_selector_loop.py`` introduced by 4b80753.

* Address copilot's comments
With foreign language SDKs, it may be possible the two sides of
supervisor comm have different versions. This adds a migration layer at
the supervisor (server) side, so an SDK (client) using a lower version
of the schema may be able to communicate to the server.
@uranusjr
Copy link
Copy Markdown
Member

I went through the AIP and it seems like this is up-to-date now. It still needs to wait until the AIP passes, but I think we can do reviews in the mean time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AIP-108: Coordinator Change this to an 'area:' label after AIP acceptance. AIP-108: java-sdk Change this to an 'area:' label after AIP acceptance. area:ConfigTemplates area:DAG-processing area:dev-tools area:Executors-core LocalExecutor & SequentialExecutor area:providers area:task-sdk kind:documentation provider:standard

Development

Successfully merging this pull request may close these issues.

8 participants