Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion providers/edge3/docs/architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ The current version of the EdgeExecutor is released with known limitations. It w

The following features are known missing and will be implemented in increments:

- API token per worker: Today there is a global API token available only
- API token per worker / per team: Today there is a global API token available only. This means
that multi-team isolation (see :ref:`edge_executor:multi_team`) is logical only and does not
provide a security boundary between teams.
- Edge Worker Plugin

- Overview about queues / jobs per queue
Expand Down
24 changes: 23 additions & 1 deletion providers/edge3/docs/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ to apply the edge3 schema migrations:
airflow db migrate

To kick off a worker, you need to setup Airflow and kick off the worker
subcommand
subcommand.

If you are using multi-team setups, you can assign the worker to a specific team
using the ``--team-name`` option. See :ref:`edge_executor:multi_team` for more details
on team-based isolation.

.. code-block:: bash

Expand All @@ -122,6 +126,12 @@ subcommand
2025-09-27T12:28:33.171525Z [info ] No new job to process


To start a worker assigned to a specific team:

.. code-block:: bash

airflow edge worker --team-name team_a -q remote,wisconsin_site

You can also start this worker in the background by running
it as a daemonized process. Additionally, you can redirect stdout
and stderr to their respective files.
Expand Down Expand Up @@ -240,3 +250,15 @@ instance. The commands are:
- ``airflow edge remove-remote-edge-worker``: Remove a worker instance from the cluster
- ``airflow edge add-worker-queues``: Add queues to an edge worker
- ``airflow edge remove-worker-queues``: Remove queues from an edge worker

In multi-team setups, several of these commands accept a ``--team-name`` option to
target workers belonging to a specific team. Refer to the :doc:`cli-ref` for the
full list of arguments.

.. important::

If a worker belongs to a team, you **must** specify the correct ``--team-name`` when
using worker-related commands or APIs that target it. The worker is identified by the
combination of its hostname and team name, so omitting or providing an incorrect team
name will cause the command to not find the worker. If the worker does not belong to
any team, ``--team-name`` can be omitted.
49 changes: 49 additions & 0 deletions providers/edge3/docs/edge_executor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,53 @@ When using EdgeExecutor in addition to other executors and EdgeExecutor not bein
as the executor at task or Dag level in addition to the queues you are targeting.
For more details on multiple executors please see :ref:`apache-airflow:using-multiple-executors-concurrently`.

.. _edge_executor:multi_team:

Multi-Team Support
------------------

The EdgeExecutor supports multi-team setups (AIP-67), allowing multiple ``EdgeExecutor`` instances
to run concurrently within the same scheduler process with team-level isolation.

Each executor instance and worker can be associated with a ``team_name``. When set, the executor
only operates on jobs and workers belonging to its team, and workers only fetch jobs assigned to
their team. This enables multiple teams to share the same Airflow infrastructure while keeping
their edge workloads isolated.

**How it works:**

- The ``team_name`` is stored as a column on both the ``edge_job`` and ``edge_worker`` database tables.
- An executor with ``team_name="team_a"`` only queues, purges, and monitors jobs/workers tagged
with ``team_name="team_a"``.
- A worker started with ``--team-name team_a`` only fetches jobs assigned to ``team_a``.
- When ``team_name`` is not set (the default), the executor and worker operate without team
isolation, maintaining full backward compatibility with existing single-team deployments.

**Starting a worker with a team name:**

.. code-block:: bash

airflow edge worker --team-name team_a -q queue1,queue2

**Per-team configuration:**

Each team can have its own configuration overrides via environment variables using the
``AIRFLOW__<TEAM_NAME>___<SECTION>__<KEY>`` pattern. For example, to set a custom heartbeat
interval for ``team_a``:

.. code-block:: bash

export AIRFLOW__TEAM_A___EDGE__HEARTBEAT_INTERVAL=30

.. warning::

**Security limitation:** The current multi-team implementation provides logical isolation
at the database level only. Worker authentication still relies on a single shared secret
(``jwt_secret``) for all teams. This means a worker could switch its ``team_name`` at
deployment time and access another team's jobs. Multi-team should therefore be considered
an organizational separation, not a security boundary. Per-worker or per-team authentication
tokens are planned for a future release.

.. _edge_executor:concurrency_slots:

Concurrency slot handling
Expand Down Expand Up @@ -115,3 +162,5 @@ Current Limitations Edge Executor
- Performance: No extensive performance assessment and scaling tests have been made. The edge executor package is
optimized for stability. This will be incrementally improved in future releases. Setups have reported stable
operation with ~80 workers until now. Note that executed tasks require more api-server / webserver API capacity.
- Multi-team isolation is logical only. All teams share the same authentication secret, so a worker could
be reconfigured to access another team's jobs. See :ref:`edge_executor:multi_team` for details.
27 changes: 19 additions & 8 deletions providers/edge3/src/airflow/providers/edge3/cli/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,20 @@ async def _make_generic_request(method: str, rest_path: str, data: str | None =


async def worker_register(
hostname: str, state: EdgeWorkerState, queues: list[str] | None, sysinfo: dict
hostname: str,
state: EdgeWorkerState,
queues: list[str] | None,
sysinfo: dict,
team_name: str | None = None,
) -> WorkerRegistrationReturn:
"""Register worker with the Edge API."""
try:
result = await _make_generic_request(
"POST",
f"worker/{quote(hostname)}",
WorkerStateBody(state=state, jobs_active=0, queues=queues, sysinfo=sysinfo).model_dump_json(
exclude_unset=True
),
WorkerStateBody(
state=state, jobs_active=0, queues=queues, sysinfo=sysinfo, team_name=team_name
).model_dump_json(exclude_unset=True),
)
except ClientResponseError as e:
if e.status == HTTPStatus.BAD_REQUEST:
Expand All @@ -142,6 +146,7 @@ async def worker_set_state(
queues: list[str] | None,
sysinfo: dict,
maintenance_comments: str | None = None,
team_name: str | None = None,
) -> WorkerSetStateReturn:
"""Update the state of the worker in the central site and thereby implicitly heartbeat."""
try:
Expand All @@ -154,6 +159,7 @@ async def worker_set_state(
queues=queues,
sysinfo=sysinfo,
maintenance_comments=maintenance_comments,
team_name=team_name,
).model_dump_json(exclude_unset=True),
)
except ClientResponseError as e:
Expand All @@ -163,14 +169,19 @@ async def worker_set_state(
return WorkerSetStateReturn(**result)


async def jobs_fetch(hostname: str, queues: list[str] | None, free_concurrency: int) -> EdgeJobFetched | None:
async def jobs_fetch(
hostname: str,
queues: list[str] | None,
free_concurrency: int,
team_name: str | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mhm, I am thinking a bit about security. So we can enable multi-team of course but at the moment the authentication of the worker is limited to one shared secret for all teams. So while technically this is a separation in terms of security this is kind of void as the user can just switch the team name in deployment.

This would need to be added to docs as restriction.

) -> EdgeJobFetched | None:
"""Fetch a job to execute on the edge worker."""
result = await _make_generic_request(
"POST",
f"jobs/fetch/{quote(hostname)}",
WorkerQueuesBody(queues=queues, free_concurrency=free_concurrency).model_dump_json(
exclude_unset=True
),
WorkerQueuesBody(
queues=queues, free_concurrency=free_concurrency, team_name=team_name
).model_dump_json(exclude_unset=True),
)
if result:
return EdgeJobFetched(**result)
Expand Down
30 changes: 26 additions & 4 deletions providers/edge3/src/airflow/providers/edge3/cli/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
("-q", "--queues"),
help="Comma delimited list of queues to serve, serve all queues if not provided.",
)
ARG_TEAM_NAME = Arg(
(
"-t",
"--team-name",
),
help="Team name for multi-team setups. If not provided, worker operates without team isolation.",
)
ARG_EDGE_HOSTNAME = Arg(
("-H", "--edge-hostname"),
help="Set the hostname of worker if you have multiple workers on a single machine",
Expand Down Expand Up @@ -115,6 +122,7 @@
args=(
ARG_CONCURRENCY,
ARG_QUEUES,
ARG_TEAM_NAME,
ARG_EDGE_HOSTNAME,
ARG_PID,
ARG_VERBOSE,
Expand Down Expand Up @@ -163,6 +171,7 @@
args=(
ARG_OUTPUT,
ARG_STATE,
ARG_TEAM_NAME,
),
),
ActionCommand(
Expand All @@ -172,6 +181,7 @@
args=(
ARG_REQUIRED_EDGE_HOSTNAME,
ARG_REQUIRED_MAINTENANCE_COMMENT,
ARG_TEAM_NAME,
),
),
ActionCommand(
Expand All @@ -180,7 +190,10 @@
func=lazy_load_command(
"airflow.providers.edge3.cli.edge_command.remove_remote_worker_from_maintenance"
),
args=(ARG_REQUIRED_EDGE_HOSTNAME,),
args=(
ARG_REQUIRED_EDGE_HOSTNAME,
ARG_TEAM_NAME,
),
),
ActionCommand(
name="remote-edge-worker-update-maintenance-comment",
Expand All @@ -191,19 +204,26 @@
args=(
ARG_REQUIRED_EDGE_HOSTNAME,
ARG_REQUIRED_MAINTENANCE_COMMENT,
ARG_TEAM_NAME,
),
),
ActionCommand(
name="remove-remote-edge-worker",
help="Remove remote edge worker entry from db.",
func=lazy_load_command("airflow.providers.edge3.cli.edge_command.remove_remote_worker"),
args=(ARG_REQUIRED_EDGE_HOSTNAME,),
args=(
ARG_REQUIRED_EDGE_HOSTNAME,
ARG_TEAM_NAME,
),
),
ActionCommand(
name="shutdown-remote-edge-worker",
help="Initiate the shutdown of the remote edge worker.",
func=lazy_load_command("airflow.providers.edge3.cli.edge_command.remote_worker_request_shutdown"),
args=(ARG_REQUIRED_EDGE_HOSTNAME,),
args=(
ARG_REQUIRED_EDGE_HOSTNAME,
ARG_TEAM_NAME,
),
),
ActionCommand(
name="add-worker-queues",
Expand All @@ -212,6 +232,7 @@
args=(
ARG_REQUIRED_EDGE_HOSTNAME,
ARG_QUEUES_MANAGE,
ARG_TEAM_NAME,
),
),
ActionCommand(
Expand All @@ -221,13 +242,14 @@
args=(
ARG_REQUIRED_EDGE_HOSTNAME,
ARG_QUEUES_MANAGE,
ARG_TEAM_NAME,
),
),
ActionCommand(
name="shutdown-all-workers",
help="Request graceful shutdown of all edge workers.",
func=lazy_load_command("airflow.providers.edge3.cli.edge_command.shutdown_all_workers"),
args=(ARG_YES,),
args=(ARG_YES, ARG_TEAM_NAME),
),
]

Expand Down
Loading