Skip to content

AIP 67 - Multi-Team: Update Edge Executor to support multi team#61646

Open
wjddn279 wants to merge 6 commits intoapache:mainfrom
wjddn279:edge-worker-multi-team
Open

AIP 67 - Multi-Team: Update Edge Executor to support multi team#61646
wjddn279 wants to merge 6 commits intoapache:mainfrom
wjddn279:edge-worker-multi-team

Conversation

@wjddn279
Copy link
Contributor

@wjddn279 wjddn279 commented Feb 8, 2026

Config isolation

No major issues here. Following the same pattern as other executors (LocalExecutor, CeleryExecutor), all direct reads from the global conf have been replaced with self.conf, which is a team-aware ExecutorConf instance created in the base executor. This ensures that each team's executor reads team-specific configuration values (e.g., heartbeat interval, purge intervals) without affecting other teams.

Multi-instance isolation

Determining the right approach for isolating resources between teams in EdgeExecutor was not straightforward. Looking at the CeleryExecutor as a reference, it achieves full team isolation by assigning each team a separate broker and separate Celery worker pool. Based on this, I concluded that edge workers should also be partitioned per team.

However, unlike CeleryExecutor which uses external brokers, EdgeExecutor manages all persistence through shared DB tables. This means team-level isolation needs to happen at the query level. Specifically, the maintenance operations (_purge_jobs, _update_orphaned_jobs, _check_worker_liveness) were previously operating on all rows in these tables indiscriminately. In a multi-team setup where each team may have different configuration values, this could lead to one team's executor incorrectly purging another team's jobs or marking another team's workers as dead.

To address this, I introduced _managed_queues -- a per-instance set that tracks which queues this executor is responsible for. It is initialized with the default_queue from the (possibly team-specific) config and grows as queue_workload() is called. All maintenance queries now filter by WHERE queue IN (_managed_queues), and worker liveness checks skip workers whose registered queues do not overlap with the executor's managed queues.

This approach assumes that each team uses a distinct set of queues and that different teams do not share the same queue names.

Questions

  1. Are queues guaranteed to be unique across teams? For example, could team_1 and team_2 both use a queue named "default"? If so, the current queue-based isolation would break down.
  2. If queue uniqueness across teams is intended, is there a mechanism to enforce it? The queue parameter is set by users at the DAG/task level, so there is nothing currently preventing two teams from accidentally (or intentionally) using the same queue name.
  3. Alternatively, would it be more appropriate to add a team_name column to EdgeJobModel / EdgeWorkerModel for explicit team-level filtering, rather than relying on queue-based inference?

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@jscheffl
Copy link
Contributor

jscheffl commented Feb 8, 2026

Not sure whether AI concluded this:

Specifically, the maintenance operations (_purge_jobs, _update_orphaned_jobs, _check_worker_liveness) were previously operating on all rows in these tables indiscriminately. In a multi-team setup where each team may have different configuration values, this could lead to one team's executor incorrectly purging another team's jobs or marking another team's workers as dead.

This is WRONG. jobs are only purged on completion. Not while being in the queue. So should not be a problem. I would doubt that different configurations per team are needed.

I'd assume that in case Edge is needed that workers are started per team. Whereas central tables are shared.

Shall we make the PR "draft" until questions are clear?

Regarding the questions:

  1. Are queues guaranteed to be unique across teams? For example, could team_1 and team_2 both use a queue named "default"? If so, the current queue-based isolation would break down. --> Not an expert in the multi team setup ... @o-nikolas can you provide an answer?
  2. If queue uniqueness across teams is intended, is there a mechanism to enforce it? The queue parameter is set by users at the DAG/task level, so there is nothing currently preventing two teams from accidentally (or intentionally) using the same queue name. --> same answer bucket.
  3. Alternatively, would it be more appropriate to add a team_name column to EdgeJobModel / EdgeWorkerModel for explicit team-level filtering, rather than relying on queue-based inference? --> without knowing the answers from above... I assume so. Which would be a blocker until the DB migration tooling in integrated in Edge which is still pending from Introduce EdgeDBManager: Independent Provider Specific Database Schema Management #61155 - before this not merged we should not extend the DB scheme.

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking merge. Besides the changes I do not agree there is ZERO documentation being added how to make a multi-team setup possible. Also I assume DB Scheme must be adjusted - depending on if queues overlap.

Copy link
Member

@dheerajturaga dheerajturaga left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that Edge executor is unique in persisting all state to shared tables (edge_job, edge_worker), the isolation approach needs to be discussed and agreed upon with the AIP-67 authors and the Edge provider maintainers before writing the implementation and not left as open questions in the PR body. I'd suggest starting that conversation first, since implementing a correct solution would involve modifying the table schemas for edge. What has been done for Celery can't be simply applied as is.

@wjddn279
Copy link
Contributor Author

wjddn279 commented Feb 9, 2026

This is WRONG. jobs are only purged on completion. Not while being in the queue. So should not be a problem. I would doubt that different configurations per team are needed.

I see my message wasn't conveyed as I intended.

This could lead to one team's executor incorrectly purging another team's jobs or marking another team's workers as dead

This shouldn't happen. The AI mistranslated it.🥲 Sorry for that.

However, the cleanup process in edge_executor's multi-instance environment runs on all instances and performs checks on all rows (regardless of team). This can cause side effects because it runs at unintended intervals compared to when there's only one instance. I thought it would be better if it only ran for the jobs managed by that particular instance.

Therefore, we needed a minimum unit to distinguish teams in that table (edge_job), and the best option I thought might be possible (without changing the table) was to distinguish by queue. That's why I left a separate question asking if that was actually feasible. I also tried implementing it in that direction for now. Of course, I'm open to changing the approach.

@wjddn279
Copy link
Contributor Author

wjddn279 commented Feb 9, 2026

@jscheffl @dheerajturaga

Thank you for the review. Actually, I examined several cases but couldn't determine the exact direction, so I implemented it in what I thought was the best approach, and I'm of course willing to change it according to the decision. I should have checked first...

I'll change the PR to draft.

@wjddn279 wjddn279 marked this pull request as draft February 9, 2026 01:16
@o-nikolas
Copy link
Contributor

o-nikolas commented Feb 9, 2026

@jscheffl @wjddn279

Regarding the questions:

1. Are queues guaranteed to be unique across teams? For example, could team_1 and team_2 both use a queue named "default"? If so, the current queue-based isolation would break down. --> Not an expert in the multi team setup ... @o-nikolas can you provide an answer?

The queue param has always been an odd duck. Originally hardcoded into Airflow for the Celery Executor specifically, then abused for the old concrete hybrid executors (to specify which executor a task would be sent to). It's now used by only CeleryExecutor and EdgeExecutor (as far as I know). I would personally like to see this thing deprecated in favour of using the executor_config, but let's at least not abuse it yet again. It is not yet team-ified and I don't have any specific plans to do so. It is a property of a task, which is team-aware, so I'm not sure what benefit making it team aware would be or how that would look. Tasks will be sent to the correct executor instance for their team inside the scheduler well before the queue property is ever evaluated inside the executor (which is team aware_, so I'm not sure how it would help in this situation or what it would mean to make it team aware.

3. Alternatively, would it be more appropriate to add a team_name column to EdgeJobModel / EdgeWorkerModel for explicit team-level filtering, rather than relying on queue-based inference? --> without knowing the answers from above... I assume so. Which would be a blocker until the DB migration tooling in integrated in Edge which is still pending from [Add EdgeDBManager for provider-specific database migrations #61155](https://github.com/apache/airflow/pull/61155) - before this not merged we should not extend the DB scheme.

I believe this is the best answer. Let's not abuse queue again and implement this right 😄

@wjddn279
Copy link
Contributor Author

@o-nikolas @jscheffl @dheerajturaga

So to summarize your points — can I confirm that we've reached an agreement on adding a team_name column to the EdgeJobModel / EdgeWorkerModel tables?

If so, I'll update the worker startup to allow specifying a team_name to belong to, alongside the existing queue subscription. A worker will only execute a job if it belongs to the same team_name and the job is delivered through one of the specified queues.

@jscheffl
Copy link
Contributor

@o-nikolas @jscheffl @dheerajturaga

So to summarize your points — can I confirm that we've reached an agreement on adding a team_name column to the EdgeJobModel / EdgeWorkerModel tables?

If so, I'll update the worker startup to allow specifying a team_name to belong to, alongside the existing queue subscription. A worker will only execute a job if it belongs to the same team_name and the job is delivered through one of the specified queues.

Yes and - I assume this is with others as well - Multi Team is an option feature, so the CLI param and column is optionally respected only. probably NULL for most setups and this is to be respected as well.

Good thing about the delayed discussion is that meanwhile the DB manager for table migrations has been merged in #61155 so if you need a column you cann add the first migration there now.

@wjddn279 wjddn279 force-pushed the edge-worker-multi-team branch from 1951cbf to 67c6926 Compare February 23, 2026 09:12
@wjddn279 wjddn279 marked this pull request as ready for review February 23, 2026 09:19
@wjddn279
Copy link
Contributor Author

@o-nikolas @jscheffl @dheerajturaga

The task is complete and ready for review.

The following work has been done:

  1. Wrote a migration to add the team_name column.
  2. Enabled multi-instance support for the existing executor per team_name. The scope of tasks for each worker is now limited to the same team_name.
  3. Updated the commands and API endpoints related to existing workers to optionally accept team_name, and modified them to be scoped to the same team_name.

Notes

When each worker starts, cases like team_name1, team_name2, and no team are treated as 3 separate team scopes: team_name1, team_name2, and no team (team_name column = null).

@wjddn279
Copy link
Contributor Author

Also, it seems like there are no integration tests for this. Would it be okay if I write some when I have time?

@jscheffl
Copy link
Contributor

Also, it seems like there are no integration tests for this. Would it be okay if I write some when I have time?

Intergation tests for Edge are a long standing item on my bucket list. Never had time to make them. I'd be very very happy about a contribution. Maybe in a separate PR.

Otherwise some back-compat tests are failing and static checks need fixing...

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments. In general and structurally looking very good already!

hostname: str,
queues: list[str] | None,
free_concurrency: int,
team_name: str | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mhm, I am thinking a bit about security. So we can enable multi-team of course but at the moment the authentication of the worker is limited to one shared secret for all teams. So while technically this is a separation in terms of security this is kind of void as the user can just switch the team name in deployment.

This would need to be added to docs as restriction.


_REVISION_HEADS_MAP: dict[str, str] = {
"3.0.0": "9d34dfc2de06",
"3.0.2": "a09c3ee8e1d3",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume multi-team is a feature, so this will then render version 3.1.0 (not 3.0.2) - unfortunately you need to make it "forward looking".

) -> None:
"""Add queues to an edge worker."""
query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == worker_name)
query = get_query_filter_by_team_and_worker_name(worker_name, team_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking a moment... knowing that team name is an optional column the worker name is still the primary key. So adding team name to parameters and query just "ensures" that a worker is affected if correct team is provided. But actually team is not needed as the worker name still needs to be unique.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jscheffl
I've been thinking about this a bit.

Even if worker_name itself is unique, I think it would be better if commands or API calls don't work correctly unless the team_name the worker belongs to is explicitly specified.

As you suggested, if team_name is not required, anyone who knows the host_name would be able to affect workers belonging to other teams. Of course, the same is possible if the team_name is known, but I think it would be worth providing at least a minimal level of isolation.
@o-nikolas WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes sure. I think for the moment we do not need to be super-paranoid. I assume this is mostly an administrator option assuming the admin has access to all infrastructure as well. It is nothing that would be delegated down to team members. That would open a box of worms for roles and administration... maybe in the future but not in this PR.

Also the comment above was mostly "thinking loud", not a blocking argument.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed with @jscheffl here. As long as this isn't something done by teams then we don't need to be too paranoid. It is assumed that administrators will have access to all teams (someone needs to be at the top setting these things up). However, explicit can be nice, I don't feel strongly either way 🙂

Comment on lines 212 to 213
assert "3.0.2" in _REVISION_HEADS_MAP
assert _REVISION_HEADS_MAP["3.0.2"] == "a09c3ee8e1d3"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.1.0 here as well....

@jscheffl
Copy link
Contributor

Forgot to mention: There are zero docs. Can you add a description about multi teamto RST docs as well - especially also highlighting the security restriction for the time being?

@wjddn279 wjddn279 force-pushed the edge-worker-multi-team branch 2 times, most recently from d55c3f9 to e7400d6 Compare February 25, 2026 09:09
@wjddn279 wjddn279 force-pushed the edge-worker-multi-team branch 2 times, most recently from 40972e8 to 4a29779 Compare February 26, 2026 08:00
@wjddn279 wjddn279 force-pushed the edge-worker-multi-team branch from bf98bba to cdba6d9 Compare February 26, 2026 12:38
Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good now for me. @dheerajturaga can you also make a second pass review if OK for you as well?

Copy link
Member

@dheerajturaga dheerajturaga left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wjddn279 Would you be able to update the description of this PR to something more meaningful about the feature? Current description looks to be AI generated metadata that isn't capturing the spirit of this PR

Copy link
Member

@dheerajturaga dheerajturaga left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to run some backward compatibility changes and found some issues (unrelated to this PR). Otherwise this looks good. I need to resolve the other issue before running a full set of backward compat checks on this one.

Unblocking merge with request to update PR description with the correct details before merge

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:edge Edge Executor / Worker (AIP-69) / edge3

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants