-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fix running task marked as failed on Celery broker redelivery #58441 #60855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,7 +59,7 @@ | |
| XComSequenceIndexResponse, | ||
| ) | ||
| from airflow.sdk.configuration import conf | ||
| from airflow.sdk.exceptions import ErrorType | ||
| from airflow.sdk.exceptions import ErrorType, TaskAlreadyRunningError | ||
| from airflow.sdk.execution_time import comms | ||
| from airflow.sdk.execution_time.comms import ( | ||
| AssetEventsResult, | ||
|
|
@@ -1007,9 +1007,23 @@ def _on_child_started( | |
| ti_context = self.client.task_instances.start(ti.id, self.pid, start_date) | ||
| self._should_retry = ti_context.should_retry | ||
| self._last_successful_heartbeat = time.monotonic() | ||
| except Exception: | ||
| except Exception as e: | ||
| # On any error kill that subprocess! | ||
| self.kill(signal.SIGKILL) | ||
|
|
||
| # Handle broker redelivery: task already running on another worker | ||
| if isinstance(e, ServerResponseError) and e.response.status_code == 409: | ||
| # FastAPI wraps HTTPException detail in {"detail": {...}} | ||
| detail = e.detail | ||
| if isinstance(detail, dict) and "detail" in detail: | ||
| detail = detail["detail"] | ||
| if ( | ||
| isinstance(detail, dict) | ||
| and detail.get("reason") == "invalid_state" | ||
| and detail.get("previous_state") == "running" | ||
| ): | ||
|
Comment on lines
+1018
to
+1024
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is rather fragile and not a good idea to write things this way. |
||
| log.warning("Task already running, likely broker redelivery", task_instance_id=str(ti.id)) | ||
| raise TaskAlreadyRunningError(f"Task {ti.id} is already running") from e | ||
| raise | ||
|
|
||
| msg = StartupDetails.model_construct( | ||
|
|
@@ -2088,6 +2102,10 @@ def supervise( | |
| final_state=process.final_state, | ||
| ) | ||
| return exit_code | ||
| except TaskAlreadyRunningError: | ||
| # Let the executor handle this (e.g., Celery will ignore it) | ||
| log.info("Exiting due to broker redelivery", task_instance_id=str(ti.id)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "broker" as a concept only exist for Celery, so this log message doesn't make sense for other executors. |
||
| raise | ||
| finally: | ||
| if log_path and log_file_descriptor: | ||
| log_file_descriptor.close() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a new feature or did something like this exist in Airflow 2.x?