Extracted bundle_cleanup_main() function from local scope to avoid subprocess errors on OSes using spawn()#62528
Open
bluczko wants to merge 6 commits intoapache:mainfrom
Open
Extracted bundle_cleanup_main() function from local scope to avoid subprocess errors on OSes using spawn()#62528bluczko wants to merge 6 commits intoapache:mainfrom
bluczko wants to merge 6 commits intoapache:mainfrom
Conversation
…bprocess errors on OSes using spawn()
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
Contributor
There was a problem hiding this comment.
Pull request overview
Adjusts the Celery worker’s stale DAG bundle cleanup subprocess to be compatible with OSes using multiprocessing spawn() by ensuring the subprocess target is picklable (module-level).
Changes:
- Extracts the stale bundle cleanup loop into a module-level
_bundle_cleanup_main()function. - Passes
check_intervalinto the subprocess viaProcess(..., args=(check_interval,))instead of capturing it from an outer scope.
Comments suppressed due to low confidence (2)
providers/celery/src/airflow/providers/celery/cli/celery_command.py:132
- In the cleanup subprocess target,
check_intervalis untyped and the function has no return type annotation. Most helpers in this module are annotated, and addingcheck_interval: int(orfloat) plus-> Nonewould improve readability/static checking (and makes it clear the value must be picklable forspawn).
def _bundle_cleanup_main(check_interval):
from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
mgr = BundleUsageTrackingManager()
while True:
time.sleep(check_interval)
mgr.remove_stale_bundle_versions()
providers/celery/src/airflow/providers/celery/cli/celery_command.py:162
sub_proc.terminate()can raiseAttributeError: 'NoneType' object has no attribute 'terminate'ifsub_proc.start()fails (the process exists but_popenis stillNone), which was also visible in the original issue trace. Consider guarding withif sub_proc and sub_proc.is_alive():(and optionallysub_proc.join()after terminate) so the cleanup path never masks the original exception.
finally:
if sub_proc:
sub_proc.terminate()
gpietryga
reviewed
Feb 26, 2026
providers/celery/src/airflow/providers/celery/cli/celery_command.py
Outdated
Show resolved
Hide resolved
1d24cc8 to
d296411
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
closes: #62512
This PR slightly modifies the
_run_stale_bundle_cleanup()context manager:bundle_cleanup_main()has been extracted out of_run_stale_bundle_cleanup()to be accessible from module level scope and renamed to_bundle_cleanup_main()to match the conventioncheck_intervalparameter was added to_bundle_cleanup_main(), which previously was accessible from outer function scope; the param is applied to the function when subprocess is createdThis change allows to run Airflow Celery worker on macOS and possibly other OSes, which use
spawn()instead offork()to create subprocesses. The stale bundle cleanup Process target function lifecycle is now handled properly on these systems.Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.