-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fix DAG callbacks missing dag_run in context #53654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1188,6 +1188,7 @@ def recalculate(self) -> _UnfinishedStates: | |
| bundle_version=self.bundle_version, | ||
| is_failure_callback=True, | ||
| msg="task_failure", | ||
| dag_run=self.serialize_for_callback(), | ||
| ) | ||
|
|
||
| # Check if the max_consecutive_failed_dag_runs has been provided and not 0 | ||
|
|
@@ -1217,6 +1218,7 @@ def recalculate(self) -> _UnfinishedStates: | |
| bundle_version=self.bundle_version, | ||
| is_failure_callback=False, | ||
| msg="success", | ||
| dag_run=self.serialize_for_callback(), | ||
| ) | ||
|
|
||
| if (deadline := dag.deadline) and isinstance(deadline.reference, DeadlineReference.TYPES.DAGRUN): | ||
|
|
@@ -1240,6 +1242,7 @@ def recalculate(self) -> _UnfinishedStates: | |
| bundle_version=self.bundle_version, | ||
| is_failure_callback=True, | ||
| msg="all_tasks_deadlocked", | ||
| dag_run=self.serialize_for_callback(), | ||
| ) | ||
|
|
||
| # finally, if the leaves aren't done, the dag is still running | ||
|
|
@@ -1356,6 +1359,7 @@ def handle_dag_callback(self, dag: SDKDAG, success: bool = True, reason: str = " | |
| "dag": dag, | ||
| "run_id": str(self.run_id), | ||
| "reason": reason, | ||
| "dag_run": self, | ||
| } | ||
|
|
||
| callbacks = dag.on_success_callback if success else dag.on_failure_callback | ||
|
|
@@ -2013,6 +2017,29 @@ def _get_log_template(log_template_id: int | None, session: Session = NEW_SESSIO | |
| def _get_partial_task_ids(dag: DAG | None) -> list[str] | None: | ||
| return dag.task_ids if dag and dag.partial else None | ||
|
|
||
| def serialize_for_callback(self) -> dict[str, Any]: | ||
| """ | ||
| Serialize DagRun object into a dictionary for callback requests. | ||
|
|
||
| This method creates a serialized representation of the DagRun that can be | ||
| safely passed to subprocesses without requiring database access. | ||
|
|
||
| :return: Dictionary containing serialized DagRun information | ||
| """ | ||
| return { | ||
| "dag_id": self.dag_id, | ||
| "run_id": self.run_id, | ||
| "state": self.state, | ||
| "logical_date": self.logical_date.isoformat() if self.logical_date else None, | ||
| "start_date": self.start_date.isoformat() if self.start_date else None, | ||
| "end_date": self.end_date.isoformat() if self.end_date else None, | ||
| "conf": self.conf, | ||
| "run_type": self.run_type, | ||
| "run_after": self.run_after.isoformat() if self.run_after else None, | ||
| "data_interval_start": self.data_interval_start.isoformat() if self.data_interval_start else None, | ||
| "data_interval_end": self.data_interval_end.isoformat() if self.data_interval_end else None, | ||
| } | ||
|
|
||
|
Comment on lines
+2020
to
+2042
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need this, check #53684 That should take care of that and restore the Airflow 2 behavior
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you mean that the entire PR is no longer needed because it is being addressed in #53684 or just that part of the code you highlighted?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Entire PR. And I do apologize though about the time spent here. I can certainly say this will be useful context for future contributions. I do really appreciate you taking the time and hope you will continue contributing.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for addressing this issue! |
||
|
|
||
| class DagRunNote(Base): | ||
| """For storage of arbitrary notes concerning the dagrun instance.""" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't going to fix it. As the docstring says, this function is only used in dag.test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change needs to happen in https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/dag_processing/processor.py#L211
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check #53058 for similar pattern
This comment was marked as outdated.
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finally I managed to get the
dag_runinformation without querying from the database and pass it on until I can set it in the callback context.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, although since we need more keys, I was working on it in parallel since it became a blocker for some. Check #53684