-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fix DAG callbacks missing dag_run in context #53654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix DAG callbacks missing dag_run in context #53654
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
33ed343 to
a2f2c4d
Compare
| "dag": dag, | ||
| "run_id": str(self.run_id), | ||
| "reason": reason, | ||
| "dag_run": self, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't going to fix it. As the docstring says, this function is only used in dag.test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change needs to happen in https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/dag_processing/processor.py#L211
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check #53058 for similar pattern
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finally I managed to get the dag_run information without querying from the database and pass it on until I can set it in the callback context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, although since we need more keys, I was working on it in parallel since it became a blocker for some. Check #53684
a2f2c4d to
6cf6912
Compare
78519d7 to
5cbf762
Compare
This ensures DAG callbacks receive the same rich context as task callbacks, improving consistency and providing access to template variables and macros similar to Airflow 2. This has been a blocker for few users similar to apache#53058 Fixes apache#52824 Fixes apache#51402 Closes apache#51949 Related to apache#53654 Related to apache#53618
| def serialize_for_callback(self) -> dict[str, Any]: | ||
| """ | ||
| Serialize DagRun object into a dictionary for callback requests. | ||
|
|
||
| This method creates a serialized representation of the DagRun that can be | ||
| safely passed to subprocesses without requiring database access. | ||
|
|
||
| :return: Dictionary containing serialized DagRun information | ||
| """ | ||
| return { | ||
| "dag_id": self.dag_id, | ||
| "run_id": self.run_id, | ||
| "state": self.state, | ||
| "logical_date": self.logical_date.isoformat() if self.logical_date else None, | ||
| "start_date": self.start_date.isoformat() if self.start_date else None, | ||
| "end_date": self.end_date.isoformat() if self.end_date else None, | ||
| "conf": self.conf, | ||
| "run_type": self.run_type, | ||
| "run_after": self.run_after.isoformat() if self.run_after else None, | ||
| "data_interval_start": self.data_interval_start.isoformat() if self.data_interval_start else None, | ||
| "data_interval_end": self.data_interval_end.isoformat() if self.data_interval_end else None, | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this, check #53684
That should take care of that and restore the Airflow 2 behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean that the entire PR is no longer needed because it is being addressed in #53684 or just that part of the code you highlighted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Entire PR. And I do apologize though about the time spent here. I can certainly say this will be useful context for future contributions. I do really appreciate you taking the time and hope you will continue contributing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing this issue!
This ensures DAG callbacks receive the same rich context as task callbacks, improving consistency and providing access to template variables and macros similar to Airflow 2. This has been a blocker for few users similar to apache#53058 Fixes apache#52824 Fixes apache#51402 Closes apache#51949 Related to apache#53654 Related to apache#53618
This ensures DAG callbacks receive the same rich context as task callbacks, improving consistency and providing access to template variables and macros similar to Airflow 2. This has been a blocker for few users similar to apache#53058 Fixes apache#52824 Fixes apache#51402 Closes apache#51949 Related to apache#53654 Related to apache#53618
This ensures DAG callbacks receive the same rich context as task callbacks, improving consistency and providing access to template variables and macros similar to Airflow 2. This has been a blocker for few users similar to apache#53058 Fixes apache#52824 Fixes apache#51402 Closes apache#51949 Related to apache#53654 Related to apache#53618
This ensures DAG callbacks receive the same rich context as task callbacks, improving consistency and providing access to template variables and macros similar to Airflow 2. This has been a blocker for few users similar to apache#53058 Fixes apache#52824 Fixes apache#51402 Closes apache#51949 Related to apache#53654 Related to apache#53618
This ensures DAG callbacks receive the same rich context as task callbacks, improving consistency and providing access to template variables and macros similar to Airflow 2. This has been a blocker for few users similar to #53058 Fixes #52824 Fixes #51402 Closes #51949 Related to #53654 Related to #53618
This ensures DAG callbacks receive the same rich context as task callbacks, improving consistency and providing access to template variables and macros similar to Airflow 2. This has been a blocker for few users similar to apache#53058 Fixes apache#52824 Fixes apache#51402 Closes apache#51949 Related to apache#53654 Related to apache#53618
This ensures DAG callbacks receive the same rich context as task callbacks, improving consistency and providing access to template variables and macros similar to Airflow 2. This has been a blocker for few users similar to #53058 Fixes #52824 Fixes #51402 Closes #51949 Related to #53654 Related to #53618 (cherry picked from commit ef80507)
This ensures DAG callbacks receive the same rich context as task callbacks, improving consistency and providing access to template variables and macros similar to Airflow 2. This has been a blocker for few users similar to apache#53058 Fixes apache#52824 Fixes apache#51402 Closes apache#51949 Related to apache#53654 Related to apache#53618
closes: #53618
related: #52824
Description
on_success_callbackandon_failure_callback) were missing thedag_runobject in their context, making it impossible for users to access important DAG run information in their callback functions.dag_runin the DAG callbacks context which means this is fixing a breaking change.Problem
Currently, when DAG callbacks are executed, the context only contains:
dag: The DAG objectrun_id: The run ID as a stringreason: The reason for the callbackThis is inconsistent with task callbacks and with previous versions of Airflow, which have access to the full
dag_runobject.Solution
The solution addresses the Airflow 3.0 constraint that disallows direct ORM access in subprocesses by serializing the
dag_rundata and passing it through the callback request:dag_runfield toDagCallbackRequest: The request now accepts serialized DagRun data as a dictionaryDagRun.serialize_for_callback()method: Converts DagRun objects to dictionaries with all necessary fieldsdagrun.pyandscheduler_job_runner.pyto use the new serialization method_execute_dag_callbacks: Now includesdag_runin the context when providedContexttype now supports bothDagRunProtocoland dictionary typesChanges Made
DagCallbackRequest: Added optionaldag_runfield to store serialized DagRun dataDagRun.serialize_for_callback(): New method that converts DagRun objects to dictionaries with ISO-formatted datetime stringsdagrun.py: Updatedhandle_dag_callbackto useserialize_for_callback()when creating callback requestsscheduler_job_runner.py: Updated to include serializeddag_rundata in callback requestsprocessor.py: Modified_execute_dag_callbacksto includedag_runin context when providedContexttype to support bothDagRunProtocoland dictionary typesTechnical Implementation
The solution avoids the Airflow 3.0 ORM restriction by:
dag_rundata at the source (when callbacks are created)DagCallbackRequestTesting
test_callback_requests.pyfor the newdag_runfieldtest_dagrun.pyfor theserialize_for_callback()methodtest_processor.pyfor callback execution withdag_rundataBreaking Changes
dag_rundata continue to work as beforedag_runfield is only added to the context when explicitly providedRelated Issues
contextdoes not havedag_runset in Airflow 3 but it is set in Airflow 2 and where users reported that dag_run was missing from DAG callback contexts.Checklist