Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger event from deferred task does not get scheduled immediately, leading to timeout. #44759

Open
1 of 2 tasks
Birne94 opened this issue Dec 7, 2024 · 8 comments · May be fixed by #45158
Open
1 of 2 tasks

Trigger event from deferred task does not get scheduled immediately, leading to timeout. #44759

Birne94 opened this issue Dec 7, 2024 · 8 comments · May be fixed by #45158

Comments

@Birne94
Copy link

Birne94 commented Dec 7, 2024

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.1

What happened?

We are using deferred operators to execute jobs in databricks. These jobs utlize a common database so we use task pools to limit the concurrency to 1 task. This pool includes deferred operators. In some cases we see task timeouts, even though the deferred task successfully finished. You can see 1.5h passing between trigger event and scheduling:

[2024-12-06, 14:01:10 CET] {{taskinstance.py:2344}} INFO - Pausing task as DEFERRED. dag_id=my-dag, task_id=my-task, execution_date=20241205T130000, start_date=20241206T130108
[2024-12-06, 14:01:10 CET] {{local_task_job_runner.py:231}} INFO - Task exited with return code 100 (task deferral)
[2024-12-06, 14:01:11 CET] {{base.py:83}} INFO - Using connection ID 'databricks' for task execution.
[2024-12-06, 14:01:11 CET] {{databricks.py:94}} INFO - run-id 847717920033451 in run state {'life_cycle_state': 'PENDING', 'result_state': '', 'state_message': 'Waiting for cluster'}. sleeping for 30 seconds
...
[2024-12-06, 14:09:42 CET] {{databricks.py:94}} INFO - run-id 847717920033451 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': 'In run'}. sleeping for 30 seconds
[2024-12-06, 14:10:12 CET] {{databricks.py:94}} INFO - run-id 847717920033451 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': 'In run'}. sleeping for 30 seconds
[2024-12-06, 14:10:42 CET] {{triggerer_job_runner.py:602}} INFO - Trigger my-dag/scheduled__2024-12-05T13:00:00+00:00/my-task/-1/1 (ID 10030) fired: TriggerEvent<{'run_id': 847717920033451, 'run_page_url': '...', 'run_state': '{"life_cycle_state": "TERMINATED", "result_state": "SUCCESS", "state_message": ""}'}>
[2024-12-06, 15:38:27 CET] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: my-dag.my-task scheduled__2024-12-05T13:00:00+00:00 [queued]>
...
[2024-12-06, 15:38:27 CET] {{taskinstance.py:2698}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 425, in _execute_task
    raise AirflowTaskTimeout()
airflow.exceptions.AirflowTaskTimeout

Our assumption of what happens in the following:

  • Many tasks are waiting to be executed but are limited by the pool
  • Task starts running and is deferred (pool slot is consumed)
  • Deferred task is running in the triggerer (pool slot is consumed)
  • Deferred task emits trigger event and stops (pool slot is released)
  • As the pool slot is released, another task starts running (pool slot is consumed again)
  • The post-deferral task for our previous task is scheduled, but cannot run due to unavailable pool slots.
  • After the task that got scheduled in between finishes and the pool is released, the post-deferral task runs and times out immediately.

What you think should happen instead?

I see multiple things that could improve this behavior:

  • Tasks waking up after deferral do not consume slots within task pools.
  • Tasks waking up have priority over other tasks when making scheduling decisions.
  • Tasks waking up have their own timeout for the post-deferral trigger.

How to reproduce

  • Create a DAG with many deferrable tasks sharing a single task pool.
  • Reduce pool capacity to 1 and enable Include Deferred.
  • Observe that sometimes a new task is scheduled before the post-deferral task is being scheduled.

Operating System

Amazon Linux 2

Versions of Apache Airflow Providers

No response

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Birne94 Birne94 added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Dec 7, 2024
@tirkarthi
Copy link
Contributor

If you are not doing anything on execute_complete and on track to upgrade to 2.10.0 maybe exit task directly from trigger could help.

https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html#exiting-deferred-task-from-triggers

@Birne94
Copy link
Author

Birne94 commented Dec 9, 2024

If you are not doing anything on execute_complete and on track to upgrade to 2.10.0 maybe exit task directly from trigger could help.

https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html#exiting-deferred-task-from-triggers

Thank you for that hint, that sounds interesting. We need to apply a bit of logic before exiting and publish task results to xcom. From the code I see that this case would be supported, hwoever it is not mentioned in the docs. Is that feature stable already?

@tirkarthi
Copy link
Contributor

tirkarthi commented Dec 9, 2024

It's added and stable in 2.10 and you can set self.xcoms which will get pushed.

def _push_xcoms_if_necessary(self, *, task_instance: TaskInstance) -> None:

TaskSuccessEvent and TaskFailureEvent are the two events that can be used to end the task instance directly. This marks the task with the state task_instance_state and optionally pushes xcom if applicable. Here’s an example of how to use these events:

Maybe the doc could have some examples or explain this feature better

@Birne94
Copy link
Author

Birne94 commented Dec 10, 2024

Thank you @tirkarthi, we will see if we can prioritize upgrading our MWAA environment to 2.10 and testing this approach.

I assume that the described behavior (post-deferral task execution being delayed) is expected in this case and changing it would be more of a QoL change rather than a bug, right?

@eladkal eladkal added good first issue kind:documentation and removed kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Dec 10, 2024
@eladkal
Copy link
Contributor

eladkal commented Dec 10, 2024

Given what @tirkarthi mentioned - changing this to doc only issue.
PR to improve the doc are welcome

@avyuktsoni0731
Copy link

Hey @eladkal
I want to contribute to the improved documentation regarding this issue. Although I'm a newbie to Apache Airflow, could you guide me on how I can get started with it? Where can I find the appropriate documentation to contribute?

Help would be much appreciated.
Thanks

@eladkal
Copy link
Contributor

eladkal commented Dec 21, 2024

@avyuktsoni0731 check https://github.com/apache/airflow/blob/main/contributing-docs/README.rst

@avyuktsoni0731
Copy link

@eladkal I've created a PR. I tried to understand the issue that has been addressed, but do let me know if any other changes are required, will look into it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants