fix: Add AirflowRunFacet to AF3 tasks with externally changed state #60583
+85
−19
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There is still one case in Airflow 3, where the
_on_task_instance_*listener method is being called on scheduler and as a result, is using DB TaskInstance model instead ofRuntimeTaskInstance. The call is here, when external task state change is being handled, theti.fetch_handle_failure_contextis callingget_listener_manager().hook.on_task_instance_failed. Currently, for complete/fail listener methods, for DB TaskInstance model, we're calling the_on_task_instance_manual_state_changepath, which is generally correct, as it does not call any user code (as opposed to_on_task_instance_successand_on_task_instance_failedthat may call user custom run facets function and operator OL method). So to correctly handle this scheduler call, I've kept it as manual state change (we should maybe rename it to external, but it's still relevant) and I just added some checks that will create airflowrunfacet in that scenario, since we have all the models there (we still don't on api server). Added appropriate comments and tests.So TLDR; when task state is externally changed the event will be emitted from scheduler as it is now but with additional AirflowRunFacet when possible.
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.