Always provide a relevant TI context in Dag callback#61274
Always provide a relevant TI context in Dag callback#61274Asquator wants to merge 16 commits intoapache:mainfrom
Conversation
| .where(TI.dag_id == dag_run.dag_id) | ||
| .where(TI.run_id == dag_run.run_id) | ||
| .where(TI.state.in_(State.unfinished)) | ||
| .where(TI.state.in_(State.unfinished) | (TI.state.is_(None))) |
There was a problem hiding this comment.
I thought that tasks with null state should also be always marked as skipped (the in_ clause won't fetch them because of SQL semantics).
There was a problem hiding this comment.
None is in unfinished states, so there is no need to change anything here
There was a problem hiding this comment.
From what I tried, the in clause doesn't catch the None value. It's in accordance with SQL semantics for NULL values. See also: https://stackoverflow.com/questions/6362112/in-clause-with-null-or-is-null
|
I want to get some feedback on the behavior, whether it's the desired one or if anything can be made better. I will update the docs to explain the behavior of passed TIs to dag callbacks. |
Nataneljpwd
left a comment
There was a problem hiding this comment.
Looks good, I have left a few comments on the changes, overall, great find!
| .where(TI.state.in_(State.unfinished)) | ||
| .where(TI.state.in_(State.unfinished) | (TI.state.is_(None))) | ||
| ).all() | ||
| last_unfinished_ti = ( |
There was a problem hiding this comment.
What if it was a mapped task? Or what if it was concurrently running tasks, how do you decide?
As the end date is not the only factor, the start date may be a better option here, or maybe even return a few tasks if they were running concurrently, and check by the dependencies of the task rather than solely rely on end date
There was a problem hiding this comment.
Well, start_date is exactly the field I'm using here. On timeout, we assume there are unfinished tasks, which means they have no end_date yet.
|
|
||
| if not failed_leaf_tis: | ||
| return None | ||
|
|
There was a problem hiding this comment.
Why is the "if" necessary?
Maybe we can just return an empty array at the end if the check yields no results?
There was a problem hiding this comment.
It's a shortcut to avoid the logic below.
In the end, I use the min function that doesn't operate on empty collections. the default argument catches that, and yet there's no reason to scan the DAG if for some reason we have no failed tasks. I think I should add a warning there, because this function shouldn't be called on non-failed DAGs.
| # Collect all task IDs on failure paths | ||
| failure_path_task_ids = set() | ||
| for failed_leaf in failed_leaf_tis: | ||
| leaf_task = dag.get_task(failed_leaf.task_id) | ||
| upstream_ids = leaf_task.get_flat_relative_ids(upstream=True) | ||
| failure_path_task_ids.update(upstream_ids) | ||
| failure_path_task_ids.add(failed_leaf.task_id) | ||
|
|
||
| # Find failed tasks on possible failure paths | ||
| failed_on_paths = [ | ||
| ti for ti in tis | ||
| if ti.task_id in failure_path_task_ids and ti.state == State.FAILED | ||
| ] |
There was a problem hiding this comment.
This part confuses me a little, why do we get ALL task instances (tis), just to get all of the previous task instances from the failed tasks (failure_path_task_ids), just to get the failed tasks instances again?
| if ti.task_id in failure_path_task_ids and ti.state == State.FAILED | ||
| ] | ||
|
|
||
| return min(failed_on_paths, key=lambda ti: ti.end_date, default=None) |
There was a problem hiding this comment.
Why shouldn't we return multiple tasks if a few of them were running concurrently?
And why filter by end_date and not start_date?
There was a problem hiding this comment.
Why shouldn't we return multiple tasks if a few of them were running concurrently?
Unfortunately, the current Context has to provide at least one TI under the "ti" key, even on DAG failure callbacks. I don't know whether it's good or bad, probably bad, but I decided to adhere to this rule for back comp. I think the context user can still access other tasks in the DAG, can't he?
And why filter by end_date and not start_date?
A good point. Logically, I wanted to return the task that ended last and could cause the DAG to fail. On the second thought, maybe it will be wiser to filter by start_date since many tasks will be failed manually after the DAG fails.
| ) | ||
| elif dag.has_on_failure_callback: | ||
| last_finished_ti: TI | None = ( | ||
| max(info.finished_tis, key=lambda ti: ti.end_date, default=None) |
There was a problem hiding this comment.
Why do we make this check over and over again?
Can't we do it once?
There was a problem hiding this comment.
We do this check once for a finished DAG. Wdym?
| @provide_session | ||
| def get_last_ti(self, dag: SerializedDAG, session: Session = NEW_SESSION) -> TI | None: | ||
| """Get Last TI from the dagrun to build and pass Execution context object from server to then run callbacks.""" | ||
| def get_first_ti_causing_failure(self, dag: SerializedDAG, session: Session = NEW_SESSION) -> TI | None: |
There was a problem hiding this comment.
Minor nit but aybe the name of the method can be changed to get_first_ti_causing_dagrun_failure
| # This ensures that we will only use the accessible TI | ||
| # context for the callback. | ||
|
|
||
| failed_leaf_tis = [ |
There was a problem hiding this comment.
So leaf here is meant as if the leaf task for a given dagrun?
If so maybe a better name could be last_failed_tasks
This PR strives to make sense out of TI contexts passed to Dag callbacks. Rather than always passing the last task lexicogrpahically, the new logic passes the last "relevant" task to Dag's failure.
Behavior outline:
Was generative AI tooling used to co-author this PR?
deepwiki.com was used to find the relevant logic in the repo, and an embedded chat was used to make a skeleton for
TestDagRunGetFirstTiCausingFailure.{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.