From 3749082e214a6ad601c3732d7fcc9090d65bcbd8 Mon Sep 17 00:00:00 2001 From: Chao-Han Tsai Date: Mon, 13 Jul 2020 10:33:15 -0700 Subject: [PATCH] Backfill reset_dagruns set DagRun to NONE state (#9756) GitOrigin-RevId: 7f64f2d00bb6a0302f4b0298ef1f45caa27b31da --- airflow/models/dag.py | 1 + tests/models/test_dag.py | 50 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 69684d3769..e6aafd37b1 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1142,6 +1142,7 @@ def clear( self.set_dag_runs_state(session=session, start_date=start_date, end_date=end_date, + state=State.NONE, ) else: count = 0 diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 250560ad56..8207b2457f 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -1355,6 +1355,56 @@ def test_create_dagrun_run_type_is_obtained_from_run_id(self): dr = dag.create_dagrun(run_id="custom_is_set_to_manual", state=State.NONE) assert dr.run_type == DagRunType.MANUAL.value + def test_clear_reset_dagruns(self): + dag_id = 'test_clear_dag_reset_dagruns' + self._clean_up(dag_id) + task_id = 't1' + dag = DAG(dag_id, start_date=DEFAULT_DATE, max_active_runs=1) + t_1 = DummyOperator(task_id=task_id, dag=dag) + + session = settings.Session() + dagrun_1 = dag.create_dagrun( + run_type=DagRunType.BACKFILL_JOB, + state=State.RUNNING, + start_date=DEFAULT_DATE, + execution_date=DEFAULT_DATE, + ) + session.merge(dagrun_1) + + task_instance_1 = TI(t_1, execution_date=DEFAULT_DATE, state=State.RUNNING) + session.merge(task_instance_1) + session.commit() + + dag.clear( + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE + datetime.timedelta(days=1), + reset_dag_runs=True, + include_subdags=False, + include_parentdag=False, + session=session, + ) + + dagruns = session.query( + DagRun, + ).filter( + DagRun.dag_id == dag_id, + ).all() + + self.assertEqual(len(dagruns), 1) + dagrun = dagruns[0] # type: DagRun + self.assertEqual(dagrun.state, State.NONE) + + task_instances = session.query( + DagRun, + ).filter( + DagRun.dag_id == dag_id, + ).all() + + self.assertEqual(len(task_instances), 1) + task_instance = task_instances[0] # type: TI + self.assertEqual(task_instance.state, State.NONE) + self._clean_up(dag_id) + class TestQueries(unittest.TestCase):