Skip to content

Commit

Permalink
Check bag DAG schedule_interval match tiemtable (#23113)
Browse files Browse the repository at this point in the history
This guards against the DAG's timetable or schedule_interval from being
changed after it's created. Validation is done by creating a timetable
and check its summary matches schedule_interval. The logic is not
bullet-proof, especially if a custom timetable does not provide a useful
summary. But this is the best we can do.
  • Loading branch information
uranusjr authored Jun 8, 2022
1 parent cc35fca commit a1a9a8f
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 7 deletions.
4 changes: 4 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ def __str__(self) -> str:
return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}"


class AirflowDagInconsistent(AirflowException):
"""Raise when a DAG has inconsistent attributes."""


class AirflowClusterPolicyViolation(AirflowException):
"""Raise when there is a violation of a Cluster Policy in DAG definition."""

Expand Down
43 changes: 42 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
from airflow import settings, utils
from airflow.compat.functools import cached_property
from airflow.configuration import conf
from airflow.exceptions import AirflowException, DuplicateTaskIdFound, TaskNotFound
from airflow.exceptions import AirflowDagInconsistent, AirflowException, DuplicateTaskIdFound, TaskNotFound
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.base import ID_LEN, Base
from airflow.models.dagbag import DagBag
Expand Down Expand Up @@ -484,6 +484,47 @@ def __init__(
self._task_group = TaskGroup.create_root(self)
self.validate_schedule_and_params()

def _check_schedule_interval_matches_timetable(self) -> bool:
"""Check ``schedule_interval`` and ``timetable`` match.
This is done as a part of the DAG validation done before it's bagged, to
guard against the DAG's ``timetable`` (or ``schedule_interval``) from
being changed after it's created, e.g.
.. code-block:: python
dag1 = DAG("d1", timetable=MyTimetable())
dag1.schedule_interval = "@once"
dag2 = DAG("d2", schedule_interval="@once")
dag2.timetable = MyTimetable()
Validation is done by creating a timetable and check its summary matches
``schedule_interval``. The logic is not bullet-proof, especially if a
custom timetable does not provide a useful ``summary``. But this is the
best we can do.
"""
if self.schedule_interval == self.timetable.summary:
return True
try:
timetable = create_timetable(self.schedule_interval, self.timezone)
except ValueError:
return False
return timetable.summary == self.timetable.summary

def validate(self):
"""Validate the DAG has a coherent setup.
This is called by the DAG bag before bagging the DAG.
"""
if not self._check_schedule_interval_matches_timetable():
raise AirflowDagInconsistent(
f"inconsistent schedule: timetable {self.timetable.summary!r} "
f"does not match schedule_interval {self.schedule_interval!r}",
)
self.params.validate()
self.timetable.validate()

def __repr__(self):
return f"<DAG: {self.dag_id}>"

Expand Down
13 changes: 7 additions & 6 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
AirflowClusterPolicyViolation,
AirflowDagCycleException,
AirflowDagDuplicatedIdException,
AirflowDagInconsistent,
AirflowTimetableInvalid,
ParamValidationError,
)
Expand Down Expand Up @@ -402,25 +403,25 @@ def _process_modules(self, filepath, mods, file_last_changed_on_disk):
for (dag, mod) in top_level_dags:
dag.fileloc = mod.__file__
try:
dag.timetable.validate()
# validate dag params
dag.params.validate()
dag.validate()
self.bag_dag(dag=dag, root_dag=dag)
found_dags.append(dag)
found_dags += dag.subdags
except AirflowTimetableInvalid as exception:
self.log.exception("Failed to bag_dag: %s", dag.fileloc)
self.import_errors[dag.fileloc] = f"Invalid timetable expression: {exception}"
self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
except (
AirflowClusterPolicyViolation,
AirflowDagCycleException,
AirflowDagDuplicatedIdException,
AirflowClusterPolicyViolation,
AirflowDagInconsistent,
ParamValidationError,
) as exception:
self.log.exception("Failed to bag_dag: %s", dag.fileloc)
self.import_errors[dag.fileloc] = str(exception)
self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
else:
found_dags.append(dag)
found_dags += dag.subdags
return found_dags

def bag_dag(self, dag, root_dag):
Expand Down
26 changes: 26 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2243,6 +2243,32 @@ def return_num(num):
assert dag.params['value'] == value


@pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()])
def test_dag_timetable_match_schedule_interval(timetable):
dag = DAG("my-dag", timetable=timetable)
assert dag._check_schedule_interval_matches_timetable()


@pytest.mark.parametrize("schedule_interval", [None, "@once", "@daily", timedelta(days=1)])
def test_dag_schedule_interval_match_timetable(schedule_interval):
dag = DAG("my-dag", schedule_interval=schedule_interval)
assert dag._check_schedule_interval_matches_timetable()


@pytest.mark.parametrize("schedule_interval", [None, "@daily", timedelta(days=1)])
def test_dag_schedule_interval_change_after_init(schedule_interval):
dag = DAG("my-dag", timetable=OnceTimetable())
dag.schedule_interval = schedule_interval
assert not dag._check_schedule_interval_matches_timetable()


@pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()])
def test_dag_timetable_change_after_init(timetable):
dag = DAG("my-dag") # Default is timedelta(days=1).
dag.timetable = timetable
assert not dag._check_schedule_interval_matches_timetable()


@pytest.mark.parametrize("run_id, execution_date", [(None, datetime_tz(2020, 1, 1)), ('test-run-id', None)])
def test_set_task_instance_state(run_id, execution_date, session, dag_maker):
"""Test that set_task_instance_state updates the TaskInstance state and clear downstream failed"""
Expand Down

0 comments on commit a1a9a8f

Please sign in to comment.