Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _trigger_dag(
f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
)
coerced_logical_date = timezone.coerce_datetime(logical_date)
data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
data_interval = dag.timetable.infer_manual_data_interval(run_after=run_after)
else:
coerced_logical_date = None
data_interval = None
Expand Down
19 changes: 10 additions & 9 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,16 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:

data_interval_start = post_body.get("data_interval_start")
data_interval_end = post_body.get("data_interval_end")
if data_interval_start and data_interval_end:
data_interval = DataInterval(
start=pendulum.instance(data_interval_start),
end=pendulum.instance(data_interval_end),
)
else:
data_interval = (
dag.timetable.infer_manual_data_interval(run_after=logical_date) if logical_date else None
)
data_interval = None
if logical_date:
if data_interval_start and data_interval_end:
data_interval = DataInterval(
start=pendulum.instance(data_interval_start),
end=pendulum.instance(data_interval_end),
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=run_after)

dag_run = dag.create_dagrun(
run_id=run_id,
logical_date=logical_date,
Expand Down
2 changes: 0 additions & 2 deletions airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ def check_data_intervals(cls, values):
)
return values

## when logical date is null, the run id should be generated from run_after + random string.
# TODO: AIP83: we need to modify this validator after https://github.com/apache/airflow/pull/46398 is merged
@model_validator(mode="after")
def validate_dag_run_id(self):
if not self.dag_run_id:
Expand Down
18 changes: 8 additions & 10 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,20 +362,18 @@ def trigger_dag_run(

try:
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)

if body.data_interval_start and body.data_interval_end:
data_interval = DataInterval(
start=pendulum.instance(body.data_interval_start),
end=pendulum.instance(body.data_interval_end),
)
else:
if body.logical_date:
data_interval = None
if body.logical_date:
if body.data_interval_start and body.data_interval_end:
data_interval = DataInterval(
start=pendulum.instance(body.data_interval_start),
end=pendulum.instance(body.data_interval_end),
)
else:
data_interval = dag.timetable.infer_manual_data_interval(
run_after=coerced_logical_date or run_after
)
run_after = data_interval.end
else:
data_interval = None

if body.dag_run_id:
run_id = body.dag_run_id
Expand Down
16 changes: 9 additions & 7 deletions airflow/cli/commands/remote_commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
from pathlib import Path
from typing import TYPE_CHECKING, Protocol, cast

import pendulum

from airflow import settings
from airflow.cli.simple_table import AirflowConsole
from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
Expand Down Expand Up @@ -129,28 +127,32 @@ def _get_dag_run(
f"of {logical_date_or_run_id!r} not found"
)

dag_run_logical_date = pendulum.instance(logical_date or timezone.utcnow())
dag_run_logical_date = timezone.coerce_datetime(logical_date)
data_interval = (
dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date)
if dag_run_logical_date
else None
)
run_after = data_interval.end if data_interval else timezone.utcnow()
if create_if_necessary == "memory":
data_interval = dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date)
dag_run = DagRun(
dag_id=dag.dag_id,
run_id=logical_date_or_run_id,
run_type=DagRunType.MANUAL,
external_trigger=True,
logical_date=dag_run_logical_date,
data_interval=data_interval,
run_after=data_interval.end,
run_after=run_after,
triggered_by=DagRunTriggeredByType.CLI,
state=DagRunState.RUNNING,
)
return dag_run, True
elif create_if_necessary == "db":
data_interval = dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date)
dag_run = dag.create_dagrun(
run_id=_generate_temporary_run_id(),
logical_date=dag_run_logical_date,
data_interval=data_interval,
run_after=data_interval.end,
run_after=run_after,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.CLI,
dag_version=None,
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def _create_backfill_dag_run(
run_type=DagRunType.BACKFILL_JOB, logical_date=info.logical_date, run_after=info.run_after
),
logical_date=info.logical_date,
data_interval=info.data_interval,
data_interval=info.data_interval if info.logical_date else None,
run_after=info.run_after,
conf=dag_run_conf,
run_type=DagRunType.BACKFILL_JOB,
Expand Down
5 changes: 4 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1610,7 +1610,7 @@ def test(
"""
Execute one single DagRun for a given DAG and logical date.

:param run_after: the datetime before which to Dag won't run.
:param run_after: the datetime before which to Dag cannot run.
:param logical_date: logical date for the DAG run
:param run_conf: configuration to pass to newly created dagrun
:param conn_file_path: file path to a connection file in either yaml or json
Expand Down Expand Up @@ -1786,6 +1786,9 @@ def create_dagrun(
:meta private:
"""
logical_date = timezone.coerce_datetime(logical_date)
# For manual runs where logical_date is None, ensure no data_interval is set.
if logical_date is None and data_interval is not None:
raise ValueError("data_interval must be None when logical_date is None")

if data_interval and not isinstance(data_interval, DataInterval):
data_interval = DataInterval(*map(timezone.coerce_datetime, data_interval))
Expand Down
5 changes: 4 additions & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,15 @@ def __init__(
dag_version: DagVersion | None = None,
bundle_version: str | None = None,
):
# For manual runs where logical_date is None, ensure no data_interval is set.
if logical_date is None and data_interval is not None:
raise ValueError("data_interval must be None if logical_date is None")

if data_interval is None:
# Legacy: Only happen for runs created prior to Airflow 2.2.
self.data_interval_start = self.data_interval_end = None
else:
self.data_interval_start, self.data_interval_end = data_interval

self.bundle_version = bundle_version
self.dag_id = dag_id
self.run_id = run_id
Expand Down
7 changes: 7 additions & 0 deletions airflow/timetables/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,11 @@ def generate_run_id(
data_interval: DataInterval | None,
**extra,
) -> str:
"""
Generate a unique run ID.

:param run_type: The type of DAG run.
:param run_after: the datetime before which to Dag cannot run.
:param data_interval: The data interval of the DAG run.
"""
return run_type.generate_run_id(suffix=run_after.isoformat())
9 changes: 7 additions & 2 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2238,7 +2238,12 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION):
"warning",
)

data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date or run_after)
if logical_date:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date or run_after)
run_after = data_interval.end
else:
data_interval = None

if not run_id:
run_id = DagRun.generate_run_id(
run_type=DagRunType.MANUAL,
Expand All @@ -2251,7 +2256,7 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION):
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
run_after=data_interval.end,
run_after=run_after,
conf=run_conf,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.UI,
Expand Down
11 changes: 9 additions & 2 deletions tests/api_fastapi/core_api/routes/public/test_dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,12 @@ def _dags_for_trigger_tests(self, session=None):
[
("dag_run_5", "test-note", None, None),
("dag_run_6", "test-note", "2024-01-03T00:00:00+00:00", "2024-01-04T05:00:00+00:00"),
(None, None, None, None),
(
None,
None,
None,
None,
),
],
)
def test_should_respond_200(
Expand All @@ -1157,6 +1162,7 @@ def test_should_respond_200(
request_json["data_interval_start"] = data_interval_start
if data_interval_end is not None:
request_json["data_interval_end"] = data_interval_end
request_json["logical_date"] = fixed_now

response = test_client.post(
f"/public/dags/{DAG1_ID}/dagRuns",
Expand All @@ -1174,13 +1180,14 @@ def test_should_respond_200(
if data_interval_start is not None and data_interval_end is not None:
expected_data_interval_start = data_interval_start.replace("+00:00", "Z")
expected_data_interval_end = data_interval_end.replace("+00:00", "Z")
expected_logical_date = fixed_now.replace("+00:00", "Z")

expected_response_json = {
"conf": {},
"dag_id": DAG1_ID,
"dag_run_id": expected_dag_run_id,
"end_date": None,
"logical_date": fixed_now.replace("+00:00", "Z"),
"logical_date": expected_logical_date,
"run_after": fixed_now.replace("+00:00", "Z"),
"external_trigger": True,
"start_date": None,
Expand Down
3 changes: 2 additions & 1 deletion tests/cli/commands/remote_commands/test_asset_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def test_cli_assets_materialize(parser: ArgumentParser) -> None:
"data_interval_start": None,
"logical_date": None,
"queued_at": None,
"run_after": None,
"run_after": "2025-02-12T19:27:59.066046Z",
}

assert run_list[0] | undeterministic == undeterministic | {
Expand All @@ -146,4 +146,5 @@ def test_cli_assets_materialize(parser: ArgumentParser) -> None:
"start_date": None,
"state": "queued",
"triggered_by": "cli",
"run_after": "2025-02-12T19:27:59.066046Z",
}
12 changes: 5 additions & 7 deletions tests/cli/commands/remote_commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ def test_trigger_dag(self):
"trigger",
"example_bash_operator",
"--run-id=test_trigger_dag",
"--exec-date=2021-06-04T09:00:00+08:00",
'--conf={"foo": "bar"}',
],
),
Expand All @@ -442,13 +441,12 @@ def test_trigger_dag(self):
assert dagrun.external_trigger
assert dagrun.conf == {"foo": "bar"}

# Coerced to UTC.
assert dagrun.logical_date.isoformat(timespec="seconds") == "2021-06-04T01:00:00+00:00"
# logical_date is None as it's not provided
assert dagrun.logical_date is None

# example_bash_operator runs every day at midnight, so the data interval
# should be aligned to the previous day.
assert dagrun.data_interval_start.isoformat(timespec="seconds") == "2021-06-03T00:00:00+00:00"
assert dagrun.data_interval_end.isoformat(timespec="seconds") == "2021-06-04T00:00:00+00:00"
# data_interval is None as logical_date is None
assert dagrun.data_interval_start is None
assert dagrun.data_interval_end is None

def test_trigger_dag_with_microseconds(self):
dag_command.dag_trigger(
Expand Down
9 changes: 3 additions & 6 deletions tests/cli/commands/remote_commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,16 @@ def test_test(self):
# Check that prints, and log messages, are shown
assert "'example_python_operator__print_the_context__20180101'" in stdout.getvalue()

@mock.patch("airflow.utils.timezone.utcnow")
def test_test_no_logical_date(self, mock_utcnow):
def test_test_no_logical_date(self):
"""Test the `airflow test` command"""
now = pendulum.now("UTC")
mock_utcnow.return_value = now
ds = now.strftime("%Y%m%d")
args = self.parser.parse_args(["tasks", "test", "example_python_operator", "print_the_context"])

with redirect_stdout(io.StringIO()) as stdout:
task_command.task_test(args)

# Check that prints, and log messages, are shown
assert f"'example_python_operator__print_the_context__{ds}'" in stdout.getvalue()
assert "example_python_operator" in stdout.getvalue()
assert "print_the_context" in stdout.getvalue()

def test_cli_test_different_path(self, session, tmp_path):
"""
Expand Down
12 changes: 12 additions & 0 deletions tests/www/views/test_views_trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ def test_trigger_dag_logical_date_data_interval(admin_client):
assert run.data_interval_end == today_midnight


def test_trigger_dag_logical_date_as_none(admin_client):
test_dag_id = "example_bash_operator"

admin_client.post(f"dags/{test_dag_id}/trigger", data={"conf": "{}"})

with create_session() as session:
run = session.query(DagRun).filter(DagRun.dag_id == test_dag_id).first()
assert run is not None
assert DagRunType.MANUAL in run.run_id
assert run.run_type == DagRunType.MANUAL


def test_trigger_dag_form(admin_client):
test_dag_id = "example_bash_operator"
resp = admin_client.get(f"dags/{test_dag_id}/trigger")
Expand Down
Loading