-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.5.0
What happened
When the dag was set with dagrun_timeout parameter and the dag run failed due to time out reason, the metrics dagrun.duration.failed.<dag_id> was not triggered.
What you think should happen instead
According to the doc, the metrics dagrun.duration.failed.<dag_id> should capture Milliseconds taken for a DagRun to reach failed state. Then it should capture all kinds of dag failure including the failure caused by dag level time out.
How to reproduce
set dagrun_timeout parameter (e.g. dagrun_timeout=timedelta(seconds=5)), then set up a BashOperator task run longer than dagrun_timeout. (e.g., bash_command='sleep 120',).
Then check the metrics, dagrun.duration.failed.<dag_id> can not capture this failed dag run due to timeout reason.
Operating System
Ubuntu 22.04.1 LTS
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==7.1.0
apache-airflow-providers-common-sql==1.3.3
apache-airflow-providers-ftp==3.3.0
apache-airflow-providers-http==4.1.1
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-sqlite==3.3.1
Deployment
Virtualenv installation
Deployment details
No response
Anything else
According to the doc, the metrics dagrun.duration.failed.<dag_id> should capture Milliseconds taken for a DagRun to reach failed state. However, if the dag run was failed due to the dag run level timeout, the metric can not capture the failed dag run.
I deep dive to the airflow code and figured out the reason.
The timer dagrun.duration.failed.{self.dag_id} was triggered in the method _emit_duration_stats_for_finished_state. code
def _emit_duration_stats_for_finished_state(self):
if self.state == State.RUNNING:
return
if self.start_date is None:
self.log.warning("Failed to record duration of %s: start_date is not set.", self)
return
if self.end_date is None:
self.log.warning("Failed to record duration of %s: end_date is not set.", self)
return
duration = self.end_date - self.start_date
if self.state == State.SUCCESS:
Stats.timing(f"dagrun.duration.success.{self.dag_id}", duration)
elif self.state == State.FAILED:
Stats.timing(f"dagrun.duration.failed.{self.dag_id}", duration)
The function _emit_duration_stats_for_finished_state was only called in the update_state() method for class DagRun(). code If the update_state() method was not call, then _emit_duration_stats_for_finished_state will not used.
if self._state == DagRunState.FAILED or self._state == DagRunState.SUCCESS:
msg = (
"DagRun Finished: dag_id=%s, execution_date=%s, run_id=%s, "
"run_start_date=%s, run_end_date=%s, run_duration=%s, "
"state=%s, external_trigger=%s, run_type=%s, "
"data_interval_start=%s, data_interval_end=%s, dag_hash=%s"
)
self.log.info(
msg,
self.dag_id,
self.execution_date,
self.run_id,
self.start_date,
self.end_date,
(self.end_date - self.start_date).total_seconds()
if self.start_date and self.end_date
else None,
self._state,
self.external_trigger,
self.run_type,
self.data_interval_start,
self.data_interval_end,
self.dag_hash,
)
session.flush()
self._emit_true_scheduling_delay_stats_for_finished_state(finished_tis)
self._emit_duration_stats_for_finished_state()
When a dag run was timed out, in the scheduler job, it will only call set_state(). code
if (
dag_run.start_date
and dag.dagrun_timeout
and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
):
dag_run.set_state(DagRunState.FAILED)
unfinished_task_instances = (
session.query(TI)
.filter(TI.dag_id == dag_run.dag_id)
.filter(TI.run_id == dag_run.run_id)
.filter(TI.state.in_(State.unfinished))
)
for task_instance in unfinished_task_instances:
task_instance.state = TaskInstanceState.SKIPPED
session.merge(task_instance)
session.flush()
self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
active_runs = dag.get_num_active_runs(only_running=False, session=session)
# Work out if we should allow creating a new DagRun now?
if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
callback_to_execute = DagCallbackRequest(
full_filepath=dag.fileloc,
dag_id=dag.dag_id,
run_id=dag_run.run_id,
is_failure_callback=True,
processor_subdir=dag_model.processor_subdir,
msg="timed_out",
)
dag_run.notify_dagrun_state_changed()
return callback_to_execute
From the above code, we can see that when the DAG run was timed out, it will call the set_state() method only. Here update_state() method was not called and that is why the metrics dagrun.duration.failed.{self.dag_id} was not set up accordingly.
Please fix this bug to let the timer dagrun.duration.failed.<dag_id> can capture the failed dag run due to dag level timed out.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct