-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
Other Airflow 3 version (please specify below)
If "Other Airflow 3 version" selected, which one?
3.1.3
What happened?
We are using the EMRContainers Operators and running it in deferred mode. We have a timeout of 20 minutes on the task. Sometimes the task takes more than 20 minutes. The control goes back to the worker pod from triggerer after the timeout but we are not caching the deferred timeout exception while getting the control back to worker. This is leading to the job keep on running on EMR instead of cancelling which happens in case of normal TaskTimeoutException. IN case of retry this is leading to parallel execution of the workloads which is not expected.
In file task_runner.py
from airflow.sdk.execution_time.timeout import timeout
# TODO: handle timeout in case of deferral
timeout_seconds = task.execution_timeout.total_seconds()
try:
# It's possible we're already timed out, so fast-fail if true
if timeout_seconds <= 0:
raise AirflowTaskTimeout()
# Run task in timeout wrapper
with timeout(timeout_seconds):
result = ctx.run(execute, context=context)
except AirflowTaskTimeout:
task.on_kill()
raise
else:
result = ctx.run(execute, context=context)
We are only caching the AirflowTaskTimeout Exception but he exception that is thrown while resuming the taks on the worker is TaskDeferralTimeout
File: operator.py
def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] | None, context: Context):
"""Entrypoint method called by the Task Runner (instead of execute) when this task is resumed."""
from airflow.sdk.exceptions import TaskDeferralError, TaskDeferralTimeout
if next_kwargs is None:
next_kwargs = {}
# __fail__ is a special signal value for next_method that indicates
# this task was scheduled specifically to fail.
if next_method == TRIGGER_FAIL_REPR:
next_kwargs = next_kwargs or {}
traceback = next_kwargs.get("traceback")
if traceback is not None:
self.log.error("Trigger failed:\n%s", "\n".join(traceback))
if (error := next_kwargs.get("error", "Unknown")) == TriggerFailureReason.TRIGGER_TIMEOUT:
raise TaskDeferralTimeout(error)
raise TaskDeferralError(error)
# Grab the callable off the Operator/Task and add in any kwargs
execute_callable = getattr(self, next_method)
return execute_callable(context, **next_kwargs)
What you think should happen instead?
Expectation is the EMR job gets cancelled in case of the task timeout
How to reproduce
- Add a EMRContainersOperator Task in airflow DAG with deferrable True
- Let the task timeout with execution timeout
- The job will keep on running on EMR.
Operating System
Linux
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct