Skip to content

Conversation

@anishgirianish
Copy link
Contributor


Summary

  • Handle broker redelivery gracefully when a task is already running on another worker
  • Detect 409 "already running" response from API and raise TaskAlreadyRunningError
  • Celery executor catches this and raises Ignore() to prevent state reporting

closes #58441


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@anishgirianish anishgirianish force-pushed the fix/58441-fix-celery-task-marked-as-failed branch from 116a330 to c991bc6 Compare January 21, 2026 05:46
)
except Exception as e:
if TaskAlreadyRunningError is not None and isinstance(e, TaskAlreadyRunningError):
log.info("[%s] Task already running elsewhere, ignoring redelivered message", celery_task_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new feature or did something like this exist in Airflow 2.x?

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me why we need this "Ignore" behaviour?

Comment on lines +1018 to +1024
if isinstance(detail, dict) and "detail" in detail:
detail = detail["detail"]
if (
isinstance(detail, dict)
and detail.get("reason") == "invalid_state"
and detail.get("previous_state") == "running"
):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is rather fragile and not a good idea to write things this way.

return exit_code
except TaskAlreadyRunningError:
# Let the executor handle this (e.g., Celery will ignore it)
log.info("Exiting due to broker redelivery", task_instance_id=str(ti.id))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"broker" as a concept only exist for Celery, so this log message doesn't make sense for other executors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Celery: Running task marked as failed on broker redelivery

2 participants