Skip to content

Conversation

@joaopamaral
Copy link
Contributor

@joaopamaral joaopamaral commented Jan 9, 2024

When a task is removed and is returned in the last position of get_task_instances, the callback request will fail because it will try to access this task, but it doesn't exist anymore.

  • Airflow 2.7.1:
ERROR - Error executing DagCallbackRequest callback for file: /dags/daily.py
Traceback (most recent call last):
  File "airflow/dag_processing/processor.py", line 711, in execute_callbacks
    self._execute_dag_callbacks(dagbag, request, session)
  File "airflow/utils/session.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "airflow/dag_processing/processor.py", line 745, in _execute_dag_callbacks
    dag.handle_callback(
  File "airflow/utils/session.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "airflow/models/dag.py", line 1422, in handle_callback
    ti.task = self.get_task(ti.task_id)
  File "airflow/models/dag.py", line 2496, in get_task
    raise TaskNotFound(f"Task {task_id} not found")
airflow.exceptions.TaskNotFound: Task removed_task not found

This exception was noticed in airflow 2.7.1 but the issue is still in the latest version

To fix it we are filtering out the removed tasks to avoid having a removed task in the last position again.

  • After
>>> tis = dailydag.get_task_instances()
>>> dailydag.get_task(tis[-1].task_id)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "airflow/models/dag.py", line 2496, in get_task
    raise TaskNotFound(f"Task {task_id} not found")
airflow.exceptions.TaskNotFound: Task removed_task not found
  • Before
>>> tis = [ti for ti in dailydag.get_task_instances() if ti.state != TaskInstanceState.REMOVED]
>>> dailydag.get_task(tis[-1].task_id)
<Task(EmptyOperator): existing_task>

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test for your change?

@dirrao dirrao added area:core kind:bug This is a clearly a bug labels Jan 10, 2024
@joaopamaral joaopamaral marked this pull request as ready for review January 10, 2024 14:13
@eladkal eladkal added type:bug-fix Changelog: Bug Fixes and removed kind:bug This is a clearly a bug labels Jan 10, 2024
@eladkal eladkal added this to the Airflow 2.8.1 milestone Jan 10, 2024
@eladkal
Copy link
Contributor

eladkal commented Jan 13, 2024

Tests are failing
Can you look into it @joaopamaral ?

@ephraimbuddy ephraimbuddy merged commit 8c1c09b into apache:main Jan 15, 2024
ephraimbuddy pushed a commit that referenced this pull request Jan 15, 2024
…k instance list (#36693)

* Fix Callback exception when a removed task is the last one in the task instance list

* Add test_dag_handle_callback_with_removed_task

* Remove extra break line

* Merge TIs filters

* Fix static check

* Revert changes

(cherry picked from commit 8c1c09b)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants