Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@
from airflow.models import DagModel, DagRun
from airflow.security import permissions
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.log.action_logger import action_event_from_permission
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from airflow.www.decorators import action_logging

RESOURCE_EVENT_PREFIX = "dag_run"


@security.requires_access(
Expand Down Expand Up @@ -281,6 +285,12 @@ def get_dag_runs_batch(*, session: Session = NEW_SESSION) -> APIResponse:
],
)
@provide_session
@action_logging(
event=action_event_from_permission(
prefix=RESOURCE_EVENT_PREFIX,
permission=permissions.ACTION_CAN_CREATE,
),
)
def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
"""Trigger a DAG."""
dm = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
Expand Down
4 changes: 3 additions & 1 deletion tests/api_connexion/endpoints/test_dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_roles, delete_user
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags
from tests.test_utils.www import _check_last_log


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -1032,7 +1033,7 @@ class TestPostDagRun(TestDagRunEndpoint):
pytest.param(None, None, None, id="all-missing"),
],
)
def test_should_respond_200(self, logical_date_field_name, dag_run_id, logical_date, note):
def test_should_respond_200(self, session, logical_date_field_name, dag_run_id, logical_date, note):
self._create_dag("TEST_DAG_ID")

# We'll patch airflow.utils.timezone.utcnow to always return this so we
Expand Down Expand Up @@ -1077,6 +1078,7 @@ def test_should_respond_200(self, logical_date_field_name, dag_run_id, logical_d
"run_type": "manual",
"note": note,
}
_check_last_log(session, dag_id="TEST_DAG_ID", event="dag_run.create", execution_date=None)

def test_should_respond_400_if_a_dag_has_import_errors(self, session):
"""Test that if a dagmodel has import errors, dags won't be triggered"""
Expand Down