Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions airflow-core/src/airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,17 @@ class AirflowTimetableInvalid(AirflowException):
"""Raise when a DAG has an invalid timetable."""


class DagIsPaused(AirflowException):
"""Raise when a dag is paused and something tries to run it."""

def __init__(self, dag_id: str) -> None:
super().__init__(dag_id)
self.dag_id = dag_id

def __str__(self) -> str:
return f"Dag {self.dag_id} is paused"


class DagNotFound(AirflowNotFoundException):
"""Raise when a DAG is not available in the system."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from airflow.utils.context import Context

if AIRFLOW_V_3_0_PLUS:
from airflow.exceptions import DagIsPaused
from airflow.sdk import BaseOperatorLink
from airflow.sdk.execution_time.xcom import XCom
else:
Expand Down Expand Up @@ -131,6 +132,7 @@ class TriggerDagRunOperator(BaseOperator):
Default is ``[DagRunState.FAILED]``.
:param skip_when_already_exists: Set to true to mark the task as SKIPPED if a DAG run of the triggered
DAG for the same logical date already exists.
:param fail_when_dag_is_paused: If the dag to trigger is paused, DagIsPaused will be raised.
:param deferrable: If waiting for completion, whether or not to defer the task until done,
default is ``False``.
"""
Expand Down Expand Up @@ -160,6 +162,7 @@ def __init__(
allowed_states: list[str | DagRunState] | None = None,
failed_states: list[str | DagRunState] | None = None,
skip_when_already_exists: bool = False,
fail_when_dag_is_paused: bool = False,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs,
) -> None:
Expand All @@ -179,6 +182,7 @@ def __init__(
else:
self.failed_states = [DagRunState.FAILED]
self.skip_when_already_exists = skip_when_already_exists
self.fail_when_dag_is_paused = fail_when_dag_is_paused
self._defer = deferrable
self.logical_date = logical_date
if logical_date is NOTSET:
Expand Down Expand Up @@ -216,6 +220,14 @@ def execute(self, context: Context):
else:
run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_logical_date or timezone.utcnow()) # type: ignore[misc,call-arg]

if self.fail_when_dag_is_paused:
dag_model = DagModel.get_current(self.trigger_dag_id)
if dag_model.is_paused:
if AIRFLOW_V_3_0_PLUS:
raise DagIsPaused(dag_id=self.trigger_dag_id)
else:
raise AirflowException(f"Dag {self.trigger_dag_id} is paused")

if AIRFLOW_V_3_0_PLUS:
self._trigger_dag_af_3(context=context, run_id=run_id, parsed_logical_date=parsed_logical_date)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.exceptions import DagRunTriggerException
from airflow.exceptions import DagIsPaused, DagRunTriggerException

pytestmark = pytest.mark.db_test

Expand Down Expand Up @@ -253,8 +253,9 @@ def setup_method(self):
f.flush()
self.f_name = f.name

self.dag_model = DagModel(dag_id=TRIGGERED_DAG_ID, fileloc=self._tmpfile)
with create_session() as session:
session.add(DagModel(dag_id=TRIGGERED_DAG_ID, fileloc=self._tmpfile))
session.add(self.dag_model)
session.commit()

def teardown_method(self):
Expand Down Expand Up @@ -734,3 +735,25 @@ def test_dagstatetrigger_run_id_with_clear_and_reset(self, dag_maker):

# The second DagStateTrigger call should still use the original `logical_date` value.
assert mock_task_defer.call_args_list[1].kwargs["trigger"].run_ids == [run_id]

def test_trigger_dagrun_with_fail_when_dag_is_paused(self, dag_maker):
"""Test TriggerDagRunOperator with fail_when_dag_is_paused set to True."""
self.dag_model.set_is_paused(True)

with dag_maker(
TEST_DAG_ID, default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, serialized=True
):
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
trigger_run_id="dummy_run_id",
reset_dag_run=False,
fail_when_dag_is_paused=True,
)
dag_maker.create_dagrun()
if AIRFLOW_V_3_0_PLUS:
error = DagIsPaused
else:
error = AirflowException
with pytest.raises(error, match=f"^Dag {TRIGGERED_DAG_ID} is paused$"):
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)