-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fix running task marked as failed on Celery broker redelivery #58441 #60855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix running task marked as failed on Celery broker redelivery #58441 #60855
Conversation
116a330 to
c991bc6
Compare
| ) | ||
| except Exception as e: | ||
| if TaskAlreadyRunningError is not None and isinstance(e, TaskAlreadyRunningError): | ||
| log.info("[%s] Task already running elsewhere, ignoring redelivered message", celery_task_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a new feature or did something like this exist in Airflow 2.x?
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me why we need this "Ignore" behaviour?
| if isinstance(detail, dict) and "detail" in detail: | ||
| detail = detail["detail"] | ||
| if ( | ||
| isinstance(detail, dict) | ||
| and detail.get("reason") == "invalid_state" | ||
| and detail.get("previous_state") == "running" | ||
| ): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is rather fragile and not a good idea to write things this way.
| return exit_code | ||
| except TaskAlreadyRunningError: | ||
| # Let the executor handle this (e.g., Celery will ignore it) | ||
| log.info("Exiting due to broker redelivery", task_instance_id=str(ti.id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"broker" as a concept only exist for Celery, so this log message doesn't make sense for other executors.
Summary
TaskAlreadyRunningErrorIgnore()to prevent state reportingcloses #58441
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.