AIP 67 - Multi-Team: Update Edge Executor to support multi team#61646
AIP 67 - Multi-Team: Update Edge Executor to support multi team#61646wjddn279 wants to merge 6 commits intoapache:mainfrom
Conversation
|
Not sure whether AI concluded this:
This is WRONG. jobs are only purged on completion. Not while being in the queue. So should not be a problem. I would doubt that different configurations per team are needed. I'd assume that in case Edge is needed that workers are started per team. Whereas central tables are shared. Shall we make the PR "draft" until questions are clear? Regarding the questions:
|
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
Outdated
Show resolved
Hide resolved
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
Outdated
Show resolved
Hide resolved
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
Outdated
Show resolved
Hide resolved
jscheffl
left a comment
There was a problem hiding this comment.
Blocking merge. Besides the changes I do not agree there is ZERO documentation being added how to make a multi-team setup possible. Also I assume DB Scheme must be adjusted - depending on if queues overlap.
dheerajturaga
left a comment
There was a problem hiding this comment.
Given that Edge executor is unique in persisting all state to shared tables (edge_job, edge_worker), the isolation approach needs to be discussed and agreed upon with the AIP-67 authors and the Edge provider maintainers before writing the implementation and not left as open questions in the PR body. I'd suggest starting that conversation first, since implementing a correct solution would involve modifying the table schemas for edge. What has been done for Celery can't be simply applied as is.
I see my message wasn't conveyed as I intended.
This shouldn't happen. The AI mistranslated it.🥲 Sorry for that. However, the cleanup process in edge_executor's multi-instance environment runs on all instances and performs checks on all rows (regardless of team). This can cause side effects because it runs at unintended intervals compared to when there's only one instance. I thought it would be better if it only ran for the jobs managed by that particular instance. Therefore, we needed a minimum unit to distinguish teams in that table (edge_job), and the best option I thought might be possible (without changing the table) was to distinguish by queue. That's why I left a separate question asking if that was actually feasible. I also tried implementing it in that direction for now. Of course, I'm open to changing the approach. |
|
Thank you for the review. Actually, I examined several cases but couldn't determine the exact direction, so I implemented it in what I thought was the best approach, and I'm of course willing to change it according to the decision. I should have checked first... I'll change the PR to draft. |
The
I believe this is the best answer. Let's not abuse queue again and implement this right 😄 |
|
@o-nikolas @jscheffl @dheerajturaga So to summarize your points — can I confirm that we've reached an agreement on adding a If so, I'll update the worker startup to allow specifying a team_name to belong to, alongside the existing queue subscription. A worker will only execute a job if it belongs to the same team_name and the job is delivered through one of the specified queues. |
Yes and - I assume this is with others as well - Multi Team is an option feature, so the CLI param and column is optionally respected only. probably NULL for most setups and this is to be respected as well. Good thing about the delayed discussion is that meanwhile the DB manager for table migrations has been merged in #61155 so if you need a column you cann add the first migration there now. |
1951cbf to
67c6926
Compare
|
@o-nikolas @jscheffl @dheerajturaga The task is complete and ready for review. The following work has been done:
Notes When each worker starts, cases like |
|
Also, it seems like there are no integration tests for this. Would it be okay if I write some when I have time? |
Intergation tests for Edge are a long standing item on my bucket list. Never had time to make them. I'd be very very happy about a contribution. Maybe in a separate PR. Otherwise some back-compat tests are failing and static checks need fixing... |
jscheffl
left a comment
There was a problem hiding this comment.
Some more comments. In general and structurally looking very good already!
| hostname: str, | ||
| queues: list[str] | None, | ||
| free_concurrency: int, | ||
| team_name: str | None = None, |
There was a problem hiding this comment.
Mhm, I am thinking a bit about security. So we can enable multi-team of course but at the moment the authentication of the worker is limited to one shared secret for all teams. So while technically this is a separation in terms of security this is kind of void as the user can just switch the team name in deployment.
This would need to be added to docs as restriction.
|
|
||
| _REVISION_HEADS_MAP: dict[str, str] = { | ||
| "3.0.0": "9d34dfc2de06", | ||
| "3.0.2": "a09c3ee8e1d3", |
There was a problem hiding this comment.
I assume multi-team is a feature, so this will then render version 3.1.0 (not 3.0.2) - unfortunately you need to make it "forward looking".
| ) -> None: | ||
| """Add queues to an edge worker.""" | ||
| query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == worker_name) | ||
| query = get_query_filter_by_team_and_worker_name(worker_name, team_name) |
There was a problem hiding this comment.
I was thinking a moment... knowing that team name is an optional column the worker name is still the primary key. So adding team name to parameters and query just "ensures" that a worker is affected if correct team is provided. But actually team is not needed as the worker name still needs to be unique.
There was a problem hiding this comment.
@jscheffl
I've been thinking about this a bit.
Even if worker_name itself is unique, I think it would be better if commands or API calls don't work correctly unless the team_name the worker belongs to is explicitly specified.
As you suggested, if team_name is not required, anyone who knows the host_name would be able to affect workers belonging to other teams. Of course, the same is possible if the team_name is known, but I think it would be worth providing at least a minimal level of isolation.
@o-nikolas WDYT?
There was a problem hiding this comment.
Yes sure. I think for the moment we do not need to be super-paranoid. I assume this is mostly an administrator option assuming the admin has access to all infrastructure as well. It is nothing that would be delegated down to team members. That would open a box of worms for roles and administration... maybe in the future but not in this PR.
Also the comment above was mostly "thinking loud", not a blocking argument.
There was a problem hiding this comment.
Agreed with @jscheffl here. As long as this isn't something done by teams then we don't need to be too paranoid. It is assumed that administrators will have access to all teams (someone needs to be at the top setting these things up). However, explicit can be nice, I don't feel strongly either way 🙂
| assert "3.0.2" in _REVISION_HEADS_MAP | ||
| assert _REVISION_HEADS_MAP["3.0.2"] == "a09c3ee8e1d3" |
|
Forgot to mention: There are zero docs. Can you add a description about multi teamto RST docs as well - especially also highlighting the security restriction for the time being? |
d55c3f9 to
e7400d6
Compare
40972e8 to
4a29779
Compare
bf98bba to
cdba6d9
Compare
jscheffl
left a comment
There was a problem hiding this comment.
Looks good now for me. @dheerajturaga can you also make a second pass review if OK for you as well?
dheerajturaga
left a comment
There was a problem hiding this comment.
@wjddn279 Would you be able to update the description of this PR to something more meaningful about the feature? Current description looks to be AI generated metadata that isn't capturing the spirit of this PR
dheerajturaga
left a comment
There was a problem hiding this comment.
Trying to run some backward compatibility changes and found some issues (unrelated to this PR). Otherwise this looks good. I need to resolve the other issue before running a full set of backward compat checks on this one.
Unblocking merge with request to update PR description with the correct details before merge
Config isolation
No major issues here. Following the same pattern as other executors (LocalExecutor, CeleryExecutor), all direct reads from the global conf have been replaced with self.conf, which is a team-aware ExecutorConf instance created in the base executor. This ensures that each team's executor reads team-specific configuration values (e.g., heartbeat interval, purge intervals) without affecting other teams.
Multi-instance isolation
Determining the right approach for isolating resources between teams in EdgeExecutor was not straightforward. Looking at the CeleryExecutor as a reference, it achieves full team isolation by assigning each team a separate broker and separate Celery worker pool. Based on this, I concluded that edge workers should also be partitioned per team.
However, unlike CeleryExecutor which uses external brokers, EdgeExecutor manages all persistence through shared DB tables. This means team-level isolation needs to happen at the query level. Specifically, the maintenance operations (_purge_jobs, _update_orphaned_jobs, _check_worker_liveness) were previously operating on all rows in these tables indiscriminately. In a multi-team setup where each team may have different configuration values, this could lead to one team's executor incorrectly purging another team's jobs or marking another team's workers as dead.
To address this, I introduced _managed_queues -- a per-instance set that tracks which queues this executor is responsible for. It is initialized with the default_queue from the (possibly team-specific) config and grows as queue_workload() is called. All maintenance queries now filter by WHERE queue IN (_managed_queues), and worker liveness checks skip workers whose registered queues do not overlap with the executor's managed queues.
This approach assumes that each team uses a distinct set of queues and that different teams do not share the same queue names.
Questions
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.