Skip to content
51 changes: 30 additions & 21 deletions airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def _trigger_dag(
dag_bag: DagBag,
*,
triggered_by: DagRunTriggeredByType,
run_after: datetime,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so like e.g. this, not sure it makes sense to have this param

Copy link
Contributor

@dstandish dstandish Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but basically, any public function or public endpoint, my thought is, probably we should not expose this to user as an option / param. essentially it's read only attr i'm thinking.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue here to fix this: #46650

run_id: str | None = None,
conf: dict | str | None = None,
logical_date: datetime | None = None,
Expand All @@ -54,6 +55,7 @@ def _trigger_dag(
:param dag_id: DAG ID
:param dag_bag: DAG Bag model
:param triggered_by: the entity which triggers the dag_run
:param run_after: the datetime before which dag cannot run.
:param run_id: ID of the run
:param conf: configuration
:param logical_date: logical date of the run
Expand All @@ -65,26 +67,30 @@ def _trigger_dag(
if dag is None or dag_id not in dag_bag.dags:
raise DagNotFound(f"Dag id {dag_id} not found")

logical_date = logical_date or timezone.utcnow()

if not timezone.is_localized(logical_date):
raise ValueError("The logical date should be localized")

if replace_microseconds:
logical_date = logical_date.replace(microsecond=0)

if dag.default_args and "start_date" in dag.default_args:
min_dag_start_date = dag.default_args["start_date"]
if min_dag_start_date and logical_date < min_dag_start_date:
raise ValueError(
f"Logical date [{logical_date.isoformat()}] should be >= start_date "
f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
)
coerced_logical_date = timezone.coerce_datetime(logical_date)

data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
run_id = run_id or dag.timetable.generate_run_id(
run_type=DagRunType.MANUAL, logical_date=coerced_logical_date, data_interval=data_interval
if logical_date:
if not timezone.is_localized(logical_date):
raise ValueError("The logical date should be localized")

if replace_microseconds:
logical_date = logical_date.replace(microsecond=0)

if dag.default_args and "start_date" in dag.default_args:
min_dag_start_date = dag.default_args["start_date"]
if min_dag_start_date and logical_date < min_dag_start_date:
raise ValueError(
f"Logical date [{logical_date.isoformat()}] should be >= start_date "
f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
)
coerced_logical_date = timezone.coerce_datetime(logical_date)
data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
else:
coerced_logical_date = None
data_interval = None

run_id = run_id or DagRun.generate_run_id(
run_type=DagRunType.MANUAL,
logical_date=coerced_logical_date,
run_after=timezone.coerce_datetime(run_after),
)

# This intentionally does not use 'session' in the current scope because it
Expand All @@ -102,7 +108,7 @@ def _trigger_dag(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
run_after=data_interval.end,
run_after=run_after,
conf=run_conf,
run_type=DagRunType.MANUAL,
triggered_by=triggered_by,
Expand All @@ -120,6 +126,7 @@ def trigger_dag(
dag_id: str,
*,
triggered_by: DagRunTriggeredByType,
run_after: datetime | None = None,
run_id: str | None = None,
conf: dict | str | None = None,
logical_date: datetime | None = None,
Expand All @@ -131,6 +138,7 @@ def trigger_dag(

:param dag_id: DAG ID
:param triggered_by: the entity which triggers the dag_run
:param run_after: the datetime before which dag won't run.
:param run_id: ID of the dag_run
:param conf: configuration
:param logical_date: date of execution
Expand All @@ -147,6 +155,7 @@ def trigger_dag(
dag_id=dag_id,
dag_bag=dagbag,
run_id=run_id,
run_after=run_after or timezone.utcnow(),
conf=conf,
logical_date=logical_date,
replace_microseconds=replace_microseconds,
Expand Down
9 changes: 6 additions & 3 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
except ValidationError as err:
raise BadRequest(detail=str(err))

logical_date = pendulum.instance(post_body["logical_date"])
logical_date = pendulum.instance(post_body["logical_date"]) if post_body.get("logical_date") else None
run_after = pendulum.instance(post_body["run_after"])
run_id = post_body["run_id"]
dagrun_instance = session.scalar(
select(DagRun)
Expand All @@ -352,12 +353,14 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
end=pendulum.instance(data_interval_end),
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
data_interval = (
dag.timetable.infer_manual_data_interval(run_after=logical_date) if logical_date else None
)
dag_run = dag.create_dagrun(
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
run_after=data_interval.end,
run_after=run_after,
conf=post_body.get("conf"),
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
Expand Down
21 changes: 14 additions & 7 deletions airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class Meta:

run_id = auto_field(data_key="dag_run_id")
dag_id = auto_field(dump_only=True)
logical_date = auto_field(data_key="logical_date", validate=validate_istimezone)
logical_date = auto_field(data_key="logical_date", allow_none=True, validate=validate_istimezone)
run_after = auto_field(data_key="run_after", validate=validate_istimezone)
start_date = auto_field(dump_only=True)
end_date = auto_field(dump_only=True)
state = DagStateField(dump_only=True)
Expand All @@ -78,17 +79,23 @@ class Meta:

@pre_load
def autogenerate(self, data, **kwargs):
"""Auto generate run_id and logical_date if they are not provided."""
logical_date = data.get("logical_date", _MISSING)
"""Auto generate run_id and run_after if they are not provided."""
run_after = data.get("run_after", _MISSING)

# Auto-generate logical_date if missing
if logical_date is _MISSING:
data["logical_date"] = str(timezone.utcnow())
# Auto-generate run_after if missing
if run_after is _MISSING:
data["run_after"] = str(timezone.utcnow())

if "dag_run_id" not in data:
try:
if logical_date_str := data.get("logical_date"):
logical_date = timezone.parse(logical_date_str)
else:
logical_date = None
data["dag_run_id"] = DagRun.generate_run_id(
DagRunType.MANUAL, timezone.parse(data["logical_date"])
run_type=DagRunType.MANUAL,
logical_date=logical_date,
run_after=timezone.parse(data["run_after"]),
)
except (ParserError, TypeError) as err:
raise BadRequest("Incorrect datetime argument", detail=str(err))
Expand Down
8 changes: 5 additions & 3 deletions airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
from datetime import datetime
from enum import Enum

import pendulum
from pydantic import AwareDatetime, Field, NonNegativeInt, model_validator

from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.models import DagRun
from airflow.utils import timezone
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down Expand Up @@ -82,9 +82,11 @@ class TriggerDAGRunPostBody(StrictBaseModel):
"""Trigger DAG Run Serializer for POST body."""

dag_run_id: str | None = None
logical_date: AwareDatetime | None
data_interval_start: AwareDatetime | None = None
data_interval_end: AwareDatetime | None = None
logical_date: AwareDatetime | None
run_after: datetime = Field(default_factory=timezone.utcnow)

conf: dict = Field(default_factory=dict)
note: str | None = None

Expand All @@ -102,7 +104,7 @@ def check_data_intervals(cls, values):
def validate_dag_run_id(self):
if not self.dag_run_id:
self.dag_run_id = DagRun.generate_run_id(
DagRunType.MANUAL, self.logical_date or pendulum.now("UTC")
run_type=DagRunType.MANUAL, logical_date=self.logical_date, run_after=self.run_after
)
return self

Expand Down
16 changes: 10 additions & 6 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10501,12 +10501,6 @@ components:
- type: string
- type: 'null'
title: Dag Run Id
logical_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date
data_interval_start:
anyOf:
- type: string
Expand All @@ -10519,6 +10513,16 @@ components:
format: date-time
- type: 'null'
title: Data Interval End
logical_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date
run_after:
type: string
format: date-time
title: Run After
conf:
type: object
title: Conf
Expand Down
21 changes: 17 additions & 4 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,6 @@ def trigger_dag_run(
) -> DAGRunResponse:
"""Trigger a DAG."""
dm = session.scalar(select(DagModel).where(DagModel.is_active, DagModel.dag_id == dag_id).limit(1))
now = pendulum.now("UTC")
if not dm:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: '{dag_id}' not found")

Expand All @@ -359,6 +358,7 @@ def trigger_dag_run(

logical_date = timezone.coerce_datetime(body.logical_date)
coerced_logical_date = timezone.coerce_datetime(logical_date)
run_after = timezone.coerce_datetime(body.run_after)

try:
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
Expand All @@ -369,13 +369,26 @@ def trigger_dag_run(
end=pendulum.instance(body.data_interval_end),
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date or now)
if body.logical_date:
data_interval = dag.timetable.infer_manual_data_interval(
run_after=coerced_logical_date or run_after
)
run_after = data_interval.end
else:
data_interval = None

if body.dag_run_id:
run_id = body.dag_run_id
else:
run_id = DagRun.generate_run_id(
run_type=DagRunType.SCHEDULED, logical_date=coerced_logical_date, run_after=run_after
)

dag_run = dag.create_dagrun(
run_id=cast(str, body.dag_run_id),
run_id=run_id,
logical_date=coerced_logical_date,
data_interval=data_interval,
run_after=data_interval.end,
run_after=run_after,
conf=body.conf,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
Expand Down
8 changes: 3 additions & 5 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.SCHEDULED,
logical_date=dag_model.next_dagrun,
run_after=dag_model.next_dagrun,
data_interval=data_interval,
),
logical_date=dag_model.next_dagrun,
Expand Down Expand Up @@ -1394,12 +1394,10 @@ def _create_dag_runs_asset_triggered(

data_interval = dag.timetable.data_interval_for_events(logical_date, asset_events)
dag_run = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_id=DagRun.generate_run_id(
run_type=DagRunType.ASSET_TRIGGERED,
logical_date=logical_date,
data_interval=data_interval,
session=session,
events=asset_events,
run_after=max(logical_dates.values()),
),
logical_date=logical_date,
data_interval=data_interval,
Expand Down
8 changes: 4 additions & 4 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ def _create_backfill_dag_run(
backfill_sort_ordinal,
session,
):
from airflow.models.dagrun import DagRun

with session.begin_nested():
should_skip_create_backfill = should_create_backfill_dag_run(
info, reprocess_behavior, backfill_id, backfill_sort_ordinal, session
Expand All @@ -296,10 +298,8 @@ def _create_backfill_dag_run(
dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
try:
dr = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.BACKFILL_JOB,
logical_date=info.logical_date,
data_interval=info.data_interval,
run_id=DagRun.generate_run_id(
run_type=DagRunType.BACKFILL_JOB, logical_date=info.logical_date, run_after=info.run_after
),
logical_date=info.logical_date,
data_interval=info.data_interval,
Expand Down
6 changes: 5 additions & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,11 @@ def run(
# This is _mostly_ only used in tests
dr = DagRun(
dag_id=self.dag_id,
run_id=DagRun.generate_run_id(DagRunType.MANUAL, info.logical_date),
run_id=DagRun.generate_run_id(
run_type=DagRunType.MANUAL,
logical_date=info.logical_date,
run_after=info.run_after,
),
run_type=DagRunType.MANUAL,
logical_date=info.logical_date,
data_interval=info.data_interval,
Expand Down
Loading
Loading