Skip to content

Commit

Permalink
Backfill reset_dagruns set DagRun to NONE state (#9756)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 7f64f2d00bb6a0302f4b0298ef1f45caa27b31da
  • Loading branch information
milton0825 authored and Cloud Composer Team committed Sep 12, 2024
1 parent 8c282c3 commit 3749082
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,7 @@ def clear(
self.set_dag_runs_state(session=session,
start_date=start_date,
end_date=end_date,
state=State.NONE,
)
else:
count = 0
Expand Down
50 changes: 50 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,56 @@ def test_create_dagrun_run_type_is_obtained_from_run_id(self):
dr = dag.create_dagrun(run_id="custom_is_set_to_manual", state=State.NONE)
assert dr.run_type == DagRunType.MANUAL.value

def test_clear_reset_dagruns(self):
dag_id = 'test_clear_dag_reset_dagruns'
self._clean_up(dag_id)
task_id = 't1'
dag = DAG(dag_id, start_date=DEFAULT_DATE, max_active_runs=1)
t_1 = DummyOperator(task_id=task_id, dag=dag)

session = settings.Session()
dagrun_1 = dag.create_dagrun(
run_type=DagRunType.BACKFILL_JOB,
state=State.RUNNING,
start_date=DEFAULT_DATE,
execution_date=DEFAULT_DATE,
)
session.merge(dagrun_1)

task_instance_1 = TI(t_1, execution_date=DEFAULT_DATE, state=State.RUNNING)
session.merge(task_instance_1)
session.commit()

dag.clear(
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=1),
reset_dag_runs=True,
include_subdags=False,
include_parentdag=False,
session=session,
)

dagruns = session.query(
DagRun,
).filter(
DagRun.dag_id == dag_id,
).all()

self.assertEqual(len(dagruns), 1)
dagrun = dagruns[0] # type: DagRun
self.assertEqual(dagrun.state, State.NONE)

task_instances = session.query(
DagRun,
).filter(
DagRun.dag_id == dag_id,
).all()

self.assertEqual(len(task_instances), 1)
task_instance = task_instances[0] # type: TI
self.assertEqual(task_instance.state, State.NONE)
self._clean_up(dag_id)


class TestQueries(unittest.TestCase):

Expand Down

0 comments on commit 3749082

Please sign in to comment.