-
Notifications
You must be signed in to change notification settings - Fork 16.4k
AIP-67 - Multi Team: Update Celery Executor to support multi team #60675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP-67 - Multi Team: Update Celery Executor to support multi team #60675
Conversation
Updating the Celery executor to read from team based config and also support multiple instances running concurrently. The latter is the largest source of changes. Much of the celery configuration (both Airflow config and Celery config) was module based. Modules are cached and shared in Python. So the majority of the changes are moving that module level config code to be function based (while trying to also maintain backwards compatibility). The way Celery tasks are sent to workers also changed as a consequence of this. Since sending tasks is parallelized with multiple processes (which do not share memory with the parent) the send task logic now re-creates a celery app in the sub processes (since the pickling and unpickling that python does to try pass state to the sub processes was not reliably creating the correct celery app objects).
dheerajturaga
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the enhancements! the current implementation introduces static type errors that will fail our CI pipeline. I have provided inline patches to correct the type signatures and ensure compliance with our MyPy configuration.
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
Outdated
Show resolved
Hide resolved
providers/celery/tests/integration/celery/test_celery_executor.py
Outdated
Show resolved
Hide resolved
|
@o-nikolas I have tested the general functionality of celery worker with your changes as well as tested out the CLI. Things are working as expected. However, I don't see how I can test the |
As inferred by the presence of the correct ExecutorConf methods being available.
Thanks for the thorough review @dheerajturaga! I've addressed those issues (slightly differently for one than your suggested patch). I'm currently struggling with back compat tests. It's slow/difficult because those tests to do not run successfully in breeze on my laptop. So I have to push to the PR to test each change. As far as testing with the --team flag. For this you have to have a full multi-team setup, which we don't have great documentation for yet (coming soon). The most helpful testing is actually on the backcompat side (testing with airflow 2.11 and 3.1.X) |
providers/celery/src/airflow/providers/celery/executors/celery_executor.py
Show resolved
Hide resolved
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py
Outdated
Show resolved
Hide resolved
dheerajturaga
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@o-nikolas , I have run backward compatibility checks. Things work good in 2.11.0 however when I tried this with 3.1.3 I found issues. There are incomplete API contract in the ExecutorConf class between Airflow versions.
dheerajturaga
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully the final set of changes needed to be consistent. Everything else looks good.
providers/celery/src/airflow/providers/celery/cli/definition.py
Outdated
Show resolved
Hide resolved
providers/celery/src/airflow/providers/celery/cli/celery_command.py
Outdated
Show resolved
Hide resolved
providers/celery/src/airflow/providers/celery/cli/celery_command.py
Outdated
Show resolved
Hide resolved
providers/celery/src/airflow/providers/celery/cli/celery_command.py
Outdated
Show resolved
Hide resolved
providers/celery/src/airflow/providers/celery/cli/celery_command.py
Outdated
Show resolved
Hide resolved
Co-authored-by: Dheeraj Turaga <dheerajturaga@gmail.com>
Request for changes has been left after fixes were applied.
dheerajturaga
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! Thanks so much for your patience, I know there was a lot of back and forth. Current changes look good!
Updating the Celery executor to read from team based config and also support multiple instances running concurrently.
The latter is the largest source of changes. Much of the celery configuration (both Airflow config and Celery config) was module based. Modules are cached and shared in Python. So the majority of the changes are moving that module level config code to be function based (while trying to also maintain backwards compatibility).
The way Celery tasks are sent to workers also changed as a consequence of this. Since sending tasks is parallelized with multiple processes (which do not share memory with the parent) the send task logic now re-creates a celery app in the sub processes (since the pickling and unpickling that python does to try pass state to the sub processes was not reliably creating the correct celery app objects).
Was generative AI tooling used to co-author this PR?
Cline
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.