Skip to content

Metrics dagrun.duration.failed.<dag_id> not updated when the dag run failed due to timeout #29013

@yangguoaws

Description

@yangguoaws

Apache Airflow version

2.5.0

What happened

When the dag was set with dagrun_timeout parameter and the dag run failed due to time out reason, the metrics dagrun.duration.failed.<dag_id> was not triggered.

What you think should happen instead

According to the doc, the metrics dagrun.duration.failed.<dag_id> should capture Milliseconds taken for a DagRun to reach failed state. Then it should capture all kinds of dag failure including the failure caused by dag level time out.

How to reproduce

set dagrun_timeout parameter (e.g. dagrun_timeout=timedelta(seconds=5)), then set up a BashOperator task run longer than dagrun_timeout. (e.g., bash_command='sleep 120',).

Then check the metrics, dagrun.duration.failed.<dag_id> can not capture this failed dag run due to timeout reason.

Operating System

Ubuntu 22.04.1 LTS

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==7.1.0
apache-airflow-providers-common-sql==1.3.3
apache-airflow-providers-ftp==3.3.0
apache-airflow-providers-http==4.1.1
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-sqlite==3.3.1

Deployment

Virtualenv installation

Deployment details

No response

Anything else

According to the doc, the metrics dagrun.duration.failed.<dag_id> should capture Milliseconds taken for a DagRun to reach failed state. However, if the dag run was failed due to the dag run level timeout, the metric can not capture the failed dag run.

I deep dive to the airflow code and figured out the reason.

The timer dagrun.duration.failed.{self.dag_id} was triggered in the method _emit_duration_stats_for_finished_state. code

    def _emit_duration_stats_for_finished_state(self):
        if self.state == State.RUNNING:
            return
        if self.start_date is None:
            self.log.warning("Failed to record duration of %s: start_date is not set.", self)
            return
        if self.end_date is None:
            self.log.warning("Failed to record duration of %s: end_date is not set.", self)
            return

        duration = self.end_date - self.start_date
        if self.state == State.SUCCESS:
            Stats.timing(f"dagrun.duration.success.{self.dag_id}", duration)
        elif self.state == State.FAILED:
            Stats.timing(f"dagrun.duration.failed.{self.dag_id}", duration)

The function _emit_duration_stats_for_finished_state was only called in the update_state() method for class DagRun(). code If the update_state() method was not call, then _emit_duration_stats_for_finished_state will not used.

        if self._state == DagRunState.FAILED or self._state == DagRunState.SUCCESS:
            msg = (
                "DagRun Finished: dag_id=%s, execution_date=%s, run_id=%s, "
                "run_start_date=%s, run_end_date=%s, run_duration=%s, "
                "state=%s, external_trigger=%s, run_type=%s, "
                "data_interval_start=%s, data_interval_end=%s, dag_hash=%s"
            )
            self.log.info(
                msg,
                self.dag_id,
                self.execution_date,
                self.run_id,
                self.start_date,
                self.end_date,
                (self.end_date - self.start_date).total_seconds()
                if self.start_date and self.end_date
                else None,
                self._state,
                self.external_trigger,
                self.run_type,
                self.data_interval_start,
                self.data_interval_end,
                self.dag_hash,
            )
            session.flush()

        self._emit_true_scheduling_delay_stats_for_finished_state(finished_tis)
        self._emit_duration_stats_for_finished_state()

When a dag run was timed out, in the scheduler job, it will only call set_state(). code

        if (
            dag_run.start_date
            and dag.dagrun_timeout
            and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
        ):
            dag_run.set_state(DagRunState.FAILED)
            unfinished_task_instances = (
                session.query(TI)
                .filter(TI.dag_id == dag_run.dag_id)
                .filter(TI.run_id == dag_run.run_id)
                .filter(TI.state.in_(State.unfinished))
            )
            for task_instance in unfinished_task_instances:
                task_instance.state = TaskInstanceState.SKIPPED
                session.merge(task_instance)
            session.flush()
            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
            active_runs = dag.get_num_active_runs(only_running=False, session=session)
            # Work out if we should allow creating a new DagRun now?
            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
                dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))

            callback_to_execute = DagCallbackRequest(
                full_filepath=dag.fileloc,
                dag_id=dag.dag_id,
                run_id=dag_run.run_id,
                is_failure_callback=True,
                processor_subdir=dag_model.processor_subdir,
                msg="timed_out",
            )

            dag_run.notify_dagrun_state_changed()
            return callback_to_execute

From the above code, we can see that when the DAG run was timed out, it will call the set_state() method only. Here update_state() method was not called and that is why the metrics dagrun.duration.failed.{self.dag_id} was not set up accordingly.

Please fix this bug to let the timer dagrun.duration.failed.<dag_id> can capture the failed dag run due to dag level timed out.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions