Skip to content

Always provide a relevant TI context in Dag callback#61274

Open
Asquator wants to merge 16 commits intoapache:mainfrom
Asquator:bugfix/improve-dag-callback-context
Open

Always provide a relevant TI context in Dag callback#61274
Asquator wants to merge 16 commits intoapache:mainfrom
Asquator:bugfix/improve-dag-callback-context

Conversation

@Asquator
Copy link

@Asquator Asquator commented Jan 31, 2026

This PR strives to make sense out of TI contexts passed to Dag callbacks. Rather than always passing the last task lexicogrpahically, the new logic passes the last "relevant" task to Dag's failure.

Behavior outline:

  1. On regular failure, pass the lastest finished task out of the ones that could contribute to failure.
  2. On timeout, pass the task that started last out of the unfinished tasks.
  3. On "tasks deadlocked", pass the latest finished task.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)
    deepwiki.com was used to find the relevant logic in the repo, and an embedded chat was used to make a skeleton for TestDagRunGetFirstTiCausingFailure.

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@Asquator Asquator requested review from XD-DENG and ashb as code owners January 31, 2026 00:33
@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Jan 31, 2026
@Asquator Asquator changed the title Always provide a relevant task instance in Dag callback Always provide a relevant TI context in Dag callback Jan 31, 2026
.where(TI.dag_id == dag_run.dag_id)
.where(TI.run_id == dag_run.run_id)
.where(TI.state.in_(State.unfinished))
.where(TI.state.in_(State.unfinished) | (TI.state.is_(None)))
Copy link
Author

@Asquator Asquator Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that tasks with null state should also be always marked as skipped (the in_ clause won't fetch them because of SQL semantics).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None is in unfinished states, so there is no need to change anything here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I tried, the in clause doesn't catch the None value. It's in accordance with SQL semantics for NULL values. See also: https://stackoverflow.com/questions/6362112/in-clause-with-null-or-is-null

@Asquator Asquator marked this pull request as draft February 4, 2026 19:43
@Asquator Asquator marked this pull request as ready for review February 4, 2026 20:26
@Asquator
Copy link
Author

Asquator commented Feb 4, 2026

I want to get some feedback on the behavior, whether it's the desired one or if anything can be made better.

I will update the docs to explain the behavior of passed TIs to dag callbacks.

Copy link
Contributor

@Nataneljpwd Nataneljpwd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I have left a few comments on the changes, overall, great find!

.where(TI.state.in_(State.unfinished))
.where(TI.state.in_(State.unfinished) | (TI.state.is_(None)))
).all()
last_unfinished_ti = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if it was a mapped task? Or what if it was concurrently running tasks, how do you decide?

As the end date is not the only factor, the start date may be a better option here, or maybe even return a few tasks if they were running concurrently, and check by the dependencies of the task rather than solely rely on end date

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, start_date is exactly the field I'm using here. On timeout, we assume there are unfinished tasks, which means they have no end_date yet.


if not failed_leaf_tis:
return None

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the "if" necessary?
Maybe we can just return an empty array at the end if the check yields no results?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a shortcut to avoid the logic below.

In the end, I use the min function that doesn't operate on empty collections. the default argument catches that, and yet there's no reason to scan the DAG if for some reason we have no failed tasks. I think I should add a warning there, because this function shouldn't be called on non-failed DAGs.

Comment on lines 1441 to 1453
# Collect all task IDs on failure paths
failure_path_task_ids = set()
for failed_leaf in failed_leaf_tis:
leaf_task = dag.get_task(failed_leaf.task_id)
upstream_ids = leaf_task.get_flat_relative_ids(upstream=True)
failure_path_task_ids.update(upstream_ids)
failure_path_task_ids.add(failed_leaf.task_id)

# Find failed tasks on possible failure paths
failed_on_paths = [
ti for ti in tis
if ti.task_id in failure_path_task_ids and ti.state == State.FAILED
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part confuses me a little, why do we get ALL task instances (tis), just to get all of the previous task instances from the failed tasks (failure_path_task_ids), just to get the failed tasks instances again?

if ti.task_id in failure_path_task_ids and ti.state == State.FAILED
]

return min(failed_on_paths, key=lambda ti: ti.end_date, default=None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why shouldn't we return multiple tasks if a few of them were running concurrently?

And why filter by end_date and not start_date?

Copy link
Author

@Asquator Asquator Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why shouldn't we return multiple tasks if a few of them were running concurrently?

Unfortunately, the current Context has to provide at least one TI under the "ti" key, even on DAG failure callbacks. I don't know whether it's good or bad, probably bad, but I decided to adhere to this rule for back comp. I think the context user can still access other tasks in the DAG, can't he?

And why filter by end_date and not start_date?

A good point. Logically, I wanted to return the task that ended last and could cause the DAG to fail. On the second thought, maybe it will be wiser to filter by start_date since many tasks will be failed manually after the DAG fails.

)
elif dag.has_on_failure_callback:
last_finished_ti: TI | None = (
max(info.finished_tis, key=lambda ti: ti.end_date, default=None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we make this check over and over again?
Can't we do it once?

Copy link
Author

@Asquator Asquator Feb 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do this check once for a finished DAG. Wdym?

@provide_session
def get_last_ti(self, dag: SerializedDAG, session: Session = NEW_SESSION) -> TI | None:
"""Get Last TI from the dagrun to build and pass Execution context object from server to then run callbacks."""
def get_first_ti_causing_failure(self, dag: SerializedDAG, session: Session = NEW_SESSION) -> TI | None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit but aybe the name of the method can be changed to get_first_ti_causing_dagrun_failure

# This ensures that we will only use the accessible TI
# context for the callback.

failed_leaf_tis = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So leaf here is meant as if the leaf task for a given dagrun?
If so maybe a better name could be last_failed_tasks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DAG on_failure_callback uses wrong context

2 participants