Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
75b0a307d3e928e2ec33cdf599198252d6f0267d6aa8abcdc8eb9c6d065a2bc2
f94e161c1401142bd9e2c2e9ce3eda3fcda3fb203adbfaaaf9c27ac27cd5499b
4,343 changes: 2,180 additions & 2,163 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``134de42d3cb0`` (head) | ``e42d9fcd10d9`` | ``3.2.0`` | Add partition_key to backfill_dag_run. |
| ``6222ce48e289`` (head) | ``134de42d3cb0`` | ``3.2.0`` | Add partition fields to DagModel. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``134de42d3cb0`` | ``e42d9fcd10d9`` | ``3.2.0`` | Add partition_key to backfill_dag_run. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``e42d9fcd10d9`` | ``f8c9d7e6b5a4`` | ``3.2.0`` | Add allowed_run_types to dag. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
23 changes: 21 additions & 2 deletions airflow-core/src/airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,30 @@ def string_lower_type(val):
)

# next_execution
ARG_TABLE = Arg(
("--table",),
action="store_true",
default=False,
help="Show a table expected of attributes of next executions",
)
ARG_FIELD = Arg(
("--field",),
choices=(
"logical_date",
"data_interval.start",
"data_interval.end",
"partition_key",
"partition_date",
"run_after",
),
default=None,
help="Show given attribute of next executions",
)
ARG_NUM_EXECUTIONS = Arg(
("-n", "--num-executions"),
default=1,
type=positive_int(allow_zero=False),
help="The number of next logical date times to show",
help="The number of next executions to show",
)

# misc
Expand Down Expand Up @@ -1097,7 +1116,7 @@ class GroupCommand(NamedTuple):
"num-executions option is given"
),
func=lazy_load_command("airflow.cli.commands.dag_command.dag_next_execution"),
args=(ARG_DAG_ID, ARG_NUM_EXECUTIONS, ARG_VERBOSE),
args=(ARG_DAG_ID, ARG_TABLE, ARG_FIELD, ARG_NUM_EXECUTIONS, ARG_VERBOSE),
),
ActionCommand(
name="pause",
Expand Down
74 changes: 53 additions & 21 deletions airflow-core/src/airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from __future__ import annotations

import ast
import datetime
import errno
import json
import logging
Expand All @@ -41,7 +42,6 @@
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.jobs.job import Job
from airflow.models import DagModel, DagRun, TaskInstance
from airflow.models.dag import get_next_data_interval
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.timetables.base import TimeRestriction
Expand All @@ -55,14 +55,14 @@
from airflow.utils.state import DagRunState

if TYPE_CHECKING:
from collections.abc import Iterable
from collections.abc import Iterable, Iterator

from graphviz.dot import Dot
from sqlalchemy.orm import Session

from airflow import DAG
from airflow.serialization.definitions.dag import SerializedDAG
from airflow.timetables.base import DataInterval
from airflow.timetables.base import DagRunInfo

DAG_DETAIL_FIELDS = {*DAGResponse.model_fields, *DAGResponse.model_computed_fields}

Expand Down Expand Up @@ -310,16 +310,27 @@ def dag_state(args, session: Session = NEW_SESSION) -> None:
@providers_configuration_loaded
def dag_next_execution(args) -> None:
"""
Return the next logical datetime of a DAG at the command line.
Return information of a Dag's next execution at the command line.

>>> airflow dags next-execution tutorial
2018-08-31 10:38:00

# todo: AIP-76 determine what next execution should do for partition-driven dags
# https://github.com/apache/airflow/issues/61076
For a traditional Dag (not using partitions), this prints the logical date
of the next run by default. For a Dag using partitions, the next partition
key is printed. A different field can be printed instead using the CLI flag
``--field``.

A ``--table`` CLI flag can be used instead to print all relevant fields of
the next execution. What fields are considered relevant depends on the
schedule used by the Dag.

Use ``--num-execution`` to print more than one execution.
"""
from airflow.models.serialized_dag import SerializedDagModel

if args.table and args.field:
raise SystemExit("Cannot use --table and --field together")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit surprised we don't support mutually exclusive by default, and noticed the messages are quite different for such cases. not something we should change in the PR though

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was surprised too


with create_session() as session:
dag = SerializedDagModel.get_dag(args.dag_id, session=session)
last_parsed_dag: DagModel | None = session.scalars(
Expand All @@ -332,27 +343,48 @@ def dag_next_execution(args) -> None:
if last_parsed_dag.is_paused:
print("[INFO] Please be reminded this DAG is PAUSED now.", file=sys.stderr)

def print_execution_interval(interval: DataInterval | None):
if interval is None:
def iter_next_dagrun_info() -> Iterator[DagRunInfo | None]:
yield (dagrun_info := dag.timetable.next_run_info_from_dag_model(dag_model=last_parsed_dag))
if dagrun_info is None:
return
for _ in range(1, args.num_executions):
dagrun_info = dag.timetable.next_dagrun_info_v2(
last_dagrun_info=dagrun_info,
restriction=TimeRestriction(earliest=None, latest=None, catchup=True),
)
yield dagrun_info
if dagrun_info is None:
break

if args.table:
if last_parsed_dag.timetable_partitioned:
columns = ["partition_key", "partition_date", "run_after"]
else:
columns = ["logical_date", "data_interval.start", "data_interval.end", "run_after"]
getters = [(c, operator.attrgetter(c)) for c in columns]
AirflowConsole().print_as_table([{n: f(o) for n, f in getters} for o in iter_next_dagrun_info()])
return

if args.field:
getter = operator.attrgetter(args.field)
elif last_parsed_dag.timetable_partitioned:
getter = operator.attrgetter("partition_key")
else:
getter = operator.attrgetter("logical_date")

for info in iter_next_dagrun_info():
if info is None:
print(
"[WARN] No following schedule can be found. "
"This DAG may have schedule interval '@once' or `None`.",
file=sys.stderr,
)
print(None)
return
print(interval.start.isoformat())

next_interval = get_next_data_interval(dag.timetable, last_parsed_dag)
print_execution_interval(next_interval)

for _ in range(1, args.num_executions):
next_info = dag.timetable.next_dagrun_info(
last_automated_data_interval=next_interval,
restriction=TimeRestriction(earliest=None, latest=None, catchup=True),
)
next_interval = None if next_info is None else next_info.data_interval
print_execution_interval(next_interval)
else:
value = getter(info)
if isinstance(value, datetime.datetime): # Backward compat in format.
value = value.isoformat()
print(value)


@cli_utils.action_cli
Expand Down
8 changes: 3 additions & 5 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
from airflow.serialization.definitions.dag import SerializedDAG
from airflow.serialization.enums import Encoding
from airflow.serialization.serialized_objects import BaseSerialization, LazyDeserializedDAG
from airflow.timetables.trigger import CronPartitionTimetable
from airflow.triggers.base import BaseEventTrigger
from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries
from airflow.utils.sqlalchemy import get_dialect_name, with_row_locks
Expand Down Expand Up @@ -178,9 +177,7 @@ def calculate(cls, dag: LazyDeserializedDAG, *, session: Session) -> Self:
if not dag.timetable.can_be_scheduled:
return cls(None, 0)

# todo: AIP-76 what's a more general way to detect?
# https://github.com/apache/airflow/issues/61086
if isinstance(dag.timetable, CronPartitionTimetable):
if dag.timetable.partitioned:
log.info("getting latest run for partitioned dag", dag_id=dag.dag_id)
latest_run = session.scalar(_get_latest_runs_stmt_partitioned(dag_id=dag.dag_id))
else:
Expand Down Expand Up @@ -595,9 +592,10 @@ def update_dags(
t.max_active_tis_per_dag is not None or t.max_active_tis_per_dagrun is not None
for t in dag.tasks
)
dm.timetable_summary = dag.timetable.summary
dm.timetable_type = dag.timetable.type_name
dm.timetable_summary = dag.timetable.summary
dm.timetable_description = dag.timetable.description
dm.timetable_partitioned = dag.timetable.partitioned
dm.fail_fast = dag.fail_fast if dag.fail_fast is not None else False

allowed_types = dag.allowed_run_types
Expand Down
5 changes: 3 additions & 2 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1978,12 +1978,12 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -

try:
next_info = serdag.timetable.next_run_info_from_dag_model(dag_model=dag_model)
if TYPE_CHECKING:
assert next_info is not None
data_interval = next_info.data_interval
logical_date = next_info.logical_date
partition_key = next_info.partition_key
run_after = next_info.run_after
# todo: AIP-76 partition date is not passed to dag run
# See https://github.com/apache/airflow/issues/61167.
created_run = serdag.create_dagrun(
run_id=serdag.timetable.generate_run_id(
run_type=DagRunType.SCHEDULED,
Expand All @@ -2000,6 +2000,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
creating_job_id=self.job.id,
session=session,
partition_key=partition_key,
partition_date=next_info.partition_date,
)
active_runs_of_dags[dag_model.dag_id] += 1
dag_model.calculate_dagrun_date_fields(dag=serdag, last_automated_run=created_run)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add partition fields to DagModel.

Revision ID: 6222ce48e289
Revises: 134de42d3cb0
Create Date: 2026-02-25 06:29.58.176890

"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

from airflow.utils.sqlalchemy import UtcDateTime

revision = "6222ce48e289"
down_revision = "134de42d3cb0"
branch_labels = None
depends_on = None
airflow_version = "3.2.0"


def upgrade():
"""Add partition fields to DagModel."""
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.add_column(
sa.Column("timetable_partitioned", sa.Boolean, nullable=False, server_default="0"),
)
batch_op.add_column(sa.Column("next_dagrun_partition_key", sa.String(255)))
batch_op.add_column(sa.Column("next_dagrun_partition_date", UtcDateTime))
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.add_column(sa.Column("partition_date", UtcDateTime))


def downgrade():
"""Remove partition fields from DagModel."""
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.drop_column("timetable_partitioned")
batch_op.drop_column("next_dagrun_partition_key")
batch_op.drop_column("next_dagrun_partition_date")
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_column("partition_date")
18 changes: 14 additions & 4 deletions airflow-core/src/airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,14 @@ class DagModel(Base):
)
# Description of the dag
description: Mapped[str | None] = mapped_column(Text, nullable=True)
# Timetable Type
timetable_type: Mapped[str] = mapped_column(String(255), nullable=False, default="")
# Timetable summary
timetable_summary: Mapped[str | None] = mapped_column(Text, nullable=True)
# Timetable description
timetable_description: Mapped[str | None] = mapped_column(String(1000), nullable=True)
# Timetable Type
timetable_type: Mapped[str] = mapped_column(String(255), nullable=False, default="")
# Whether the timetable do partitioning.
timetable_partitioned: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default="0")
# Asset expression based on asset triggers
asset_expression: Mapped[dict[str, Any] | None] = mapped_column(sa.JSON(), nullable=True)
# DAG deadline information
Expand Down Expand Up @@ -413,6 +415,9 @@ class DagModel(Base):
next_dagrun_data_interval_start: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)
next_dagrun_data_interval_end: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)

next_dagrun_partition_key: Mapped[str | None] = mapped_column(String(255))
next_dagrun_partition_date: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)

# Earliest time at which this ``next_dagrun`` can be created.
next_dagrun_create_after: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)

Expand Down Expand Up @@ -748,18 +753,23 @@ def calculate_dagrun_date_fields(
last_run_info = dag.timetable.run_info_from_dag_run(dag_run=last_automated_run)
next_dagrun_info = dag.next_dagrun_info(last_automated_run_info=last_run_info)
if next_dagrun_info is None:
# there is no next dag run after the last dag run; set to None
# there is no next dag run after the last dag run; set everything to None
self.next_dagrun_data_interval = self.next_dagrun = self.next_dagrun_create_after = None
self.next_dagrun_partition_key = self.next_dagrun_partition_date = None
else:
self.next_dagrun = next_dagrun_info.logical_date
self.next_dagrun_data_interval = next_dagrun_info.data_interval
self.next_dagrun = next_dagrun_info.logical_date or next_dagrun_info.partition_date
self.next_dagrun_partition_key = next_dagrun_info.partition_key
self.next_dagrun_partition_date = next_dagrun_info.partition_date
self.next_dagrun_create_after = next_dagrun_info.run_after
log.info(
"setting next dagrun info",
next_dagrun=str(self.next_dagrun),
next_dagrun_create_after=str(self.next_dagrun_create_after),
next_dagrun_data_interval_start=str(self.next_dagrun_data_interval_start),
next_dagrun_data_interval_end=str(self.next_dagrun_data_interval_end),
next_dagrun_partition_key=self.next_dagrun_partition_key,
next_dagrun_partition_date=str(self.next_dagrun_partition_date),
)

@provide_session
Expand Down
3 changes: 3 additions & 0 deletions airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ class DagRun(Base, LoggingMixin):
"""

partition_key: Mapped[str | None] = mapped_column(StringID(), nullable=True)
partition_date: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)

# Remove this `if` after upgrading Sphinx-AutoAPI
if not TYPE_CHECKING and "BUILDING_AIRFLOW_DOCS" in os.environ:
Expand Down Expand Up @@ -330,6 +331,7 @@ def __init__(
backfill_id: NonNegativeInt | None = None,
bundle_version: str | None = None,
partition_key: str | None = None,
partition_date: datetime | None = None,
note: str | None = None,
):
# For manual runs where logical_date is None, ensure no data_interval is set.
Expand Down Expand Up @@ -372,6 +374,7 @@ def __init__(
f"Expected partition_key to be a `str` or `None` but got `{partition_key.__class__.__name__}`"
)
self.partition_key = partition_key
self.partition_date = partition_date
super().__init__()

def __repr__(self):
Expand Down
Loading
Loading