Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class DagRun(StrictBaseModel):
dag_id: str
run_id: str

logical_date: UtcDateTime
logical_date: UtcDateTime | None
data_interval_start: UtcDateTime | None
data_interval_end: UtcDateTime | None
run_after: UtcDateTime
Expand Down
2 changes: 2 additions & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,8 @@ def get_previous_dagrun(
:param session: SQLAlchemy ORM Session
:param state: the dag run state
"""
if dag_run.logical_date is None:
return None
filters = [
DagRun.dag_id == dag_run.dag_id,
DagRun.logical_date < dag_run.logical_date,
Expand Down
25 changes: 16 additions & 9 deletions docs/apache-airflow/templates-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,9 @@ Variable Type Description
``{{ logical_date }}`` `pendulum.DateTime`_ | A date-time that logically identifies the current DAG run. This value does not contain any semantics, but is simply a value for identification.
| Use ``data_interval_start`` and ``data_interval_end`` instead if you want a value that has real-world semantics,
| such as to get a slice of rows from the database based on timestamps.
``{{ ds }}`` str | The DAG run's logical date as ``YYYY-MM-DD``.
| Same as ``{{ logical_date | ds }}``.
``{{ ds_nodash }}`` str Same as ``{{ logical_date | ds_nodash }}``.
``{{ exception }}`` None | str | | Error occurred while running task instance.
Exception |
KeyboardInterrupt |
``{{ ts }}`` str | Same as ``{{ logical_date | ts }}``.
| Example: ``2018-01-01T00:00:00+00:00``.
``{{ ts_nodash_with_tz }}`` str | Same as ``{{ logical_date | ts_nodash_with_tz }}``.
| Example: ``20180101T000000+0000``.
``{{ ts_nodash }}`` str | Same as ``{{ logical_date | ts_nodash }}``.
| Example: ``20180101T000000``.
``{{ prev_data_interval_start_success }}`` `pendulum.DateTime`_ | Start of the data interval of the prior successful :class:`~airflow.models.dagrun.DagRun`.
| ``None`` | Added in version 2.2.
``{{ prev_data_interval_end_success }}`` `pendulum.DateTime`_ | End of the data interval of the prior successful :class:`~airflow.models.dagrun.DagRun`.
Expand Down Expand Up @@ -92,6 +83,22 @@ Variable Type Description
| Added in version 2.4.
=========================================== ===================== ===================================================================

The following are only available when the DagRun has a ``logical_date``

=========================================== ===================== ===================================================================
Variable Type Description
=========================================== ===================== ===================================================================
``{{ ds }}`` str | The DAG run's logical date as ``YYYY-MM-DD``.
| Same as ``{{ logical_date | ds }}``.
``{{ ds_nodash }}`` str Same as ``{{ logical_date | ds_nodash }}``.
``{{ ts }}`` str | Same as ``{{ logical_date | ts }}``.
| Example: ``2018-01-01T00:00:00+00:00``.
``{{ ts_nodash_with_tz }}`` str | Same as ``{{ logical_date | ts_nodash_with_tz }}``.
| Example: ``20180101T000000+0000``.
``{{ ts_nodash }}`` str | Same as ``{{ logical_date | ts_nodash }}``.
| Example: ``20180101T000000``.
=========================================== ===================== ===================================================================

.. note::

The DAG run's logical date, and values derived from it, such as ``ds`` and
Expand Down
2 changes: 2 additions & 0 deletions newsfragments/42404.significant.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ The shift towards using ``run_id`` as the sole identifier for DAG runs eliminate

- Removed ``logical_date`` arguments from public APIs and Python functions related to DAG run lookups.
- ``run_id`` is now the exclusive identifier for DAG runs in these contexts.
- ``ds``, ``ds_nodash``, ``ts``, ``ts_nodash``, ``ts_nodash_with_tz`` (and ``logical_date``) will no longer exist for non-scheduled DAG runs (i.e. manually triggered runs)
- ``task_instance_key_str`` template variable has changed to use ``run_id``, not the logical_date. This means the value of it will change compared to 2.x, even for old runs

* Types of change

Expand Down
5 changes: 4 additions & 1 deletion scripts/ci/pre_commit/template_context_key_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
CONTEXT_HINT = ROOT_DIR.joinpath("task_sdk", "src", "airflow", "sdk", "definitions", "context.py")
TEMPLATES_REF_RST = ROOT_DIR.joinpath("docs", "apache-airflow", "templates-ref.rst")

# These are only conditionally set
IGNORE = {"ds", "ds_nodash", "ts", "ts_nodash", "ts_nodash_with_tz", "logical_date"}


def _iter_template_context_keys_from_original_return() -> typing.Iterator[str]:
ti_mod = ast.parse(TASKRUNNER_PY.read_text("utf-8"), str(TASKRUNNER_PY))
Expand Down Expand Up @@ -154,7 +157,7 @@ def _compare_keys(retn_keys: set[str], decl_keys: set[str], hint_keys: set[str],
("Context type hint", hint_keys),
("templates-ref", docs_keys),
]
canonical_keys = set.union(*(s for _, s in check_candidates))
canonical_keys = set.union(*(s for _, s in check_candidates)) - IGNORE

def _check_one(identifier: str, keys: set[str]) -> int:
if missing := canonical_keys.difference(keys):
Expand Down
2 changes: 1 addition & 1 deletion task_sdk/src/airflow/sdk/api/datamodels/_generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class DagRun(BaseModel):
)
dag_id: Annotated[str, Field(title="Dag Id")]
run_id: Annotated[str, Field(title="Run Id")]
logical_date: Annotated[datetime, Field(title="Logical Date")]
logical_date: Annotated[datetime | None, Field(title="Logical Date")]
data_interval_start: Annotated[datetime | None, Field(title="Data Interval Start")] = None
data_interval_end: Annotated[datetime | None, Field(title="Data Interval End")] = None
run_after: Annotated[datetime, Field(title="Run After")]
Expand Down
39 changes: 23 additions & 16 deletions task_sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,25 +126,12 @@ def get_template_context(self) -> Context:
if self._ti_context_from_server:
dag_run = self._ti_context_from_server.dag_run

logical_date = dag_run.logical_date
ds = logical_date.strftime("%Y-%m-%d")
ds_nodash = ds.replace("-", "")
ts = logical_date.isoformat()
ts_nodash = logical_date.strftime("%Y%m%dT%H%M%S")
ts_nodash_with_tz = ts.replace("-", "").replace(":", "")

context_from_server: Context = {
# TODO: Assess if we need to pass these through timezone.coerce_datetime
"dag_run": dag_run,
"data_interval_end": dag_run.data_interval_end,
"data_interval_start": dag_run.data_interval_start,
"logical_date": logical_date,
"ds": ds,
"ds_nodash": ds_nodash,
"task_instance_key_str": f"{self.task.dag_id}__{self.task.task_id}__{ds_nodash}",
"ts": ts,
"ts_nodash": ts_nodash,
"ts_nodash_with_tz": ts_nodash_with_tz,
"task_instance_key_str": f"{self.task.dag_id}__{self.task.task_id}__{dag_run.run_id}",
"prev_data_interval_start_success": lazy_object_proxy.Proxy(
lambda: get_previous_dagrun_success(self.id).data_interval_start
),
Expand All @@ -160,6 +147,24 @@ def get_template_context(self) -> Context:
}
context.update(context_from_server)

if logical_date := dag_run.logical_date:
ds = logical_date.strftime("%Y-%m-%d")
ds_nodash = ds.replace("-", "")
ts = logical_date.isoformat()
ts_nodash = logical_date.strftime("%Y%m%dT%H%M%S")
ts_nodash_with_tz = ts.replace("-", "").replace(":", "")
context.update(
{
"logical_date": logical_date,
"ds": ds,
"ds_nodash": ds_nodash,
"task_instance_key_str": f"{self.task.dag_id}__{self.task.task_id}__{ds_nodash}",
"ts": ts,
"ts_nodash": ts_nodash,
"ts_nodash_with_tz": ts_nodash_with_tz,
}
)

return context

def render_templates(
Expand Down Expand Up @@ -500,7 +505,7 @@ def _process_outlets(context: Context, outlets: list[AssetProfile]):
return task_outlets, outlet_events


def run(ti: RuntimeTaskInstance, log: Logger):
def run(ti: RuntimeTaskInstance, log: Logger) -> ToSupervisor | None:
"""Run the task in this process."""
from airflow.exceptions import (
AirflowException,
Expand Down Expand Up @@ -534,7 +539,7 @@ def run(ti: RuntimeTaskInstance, log: Logger):
state=TerminalTIState.FAILED,
end_date=datetime.now(tz=timezone.utc),
)
return
return msg
context = ti.get_template_context()
with set_current_context(context):
jinja_env = ti.task.dag.get_template_env()
Expand Down Expand Up @@ -620,6 +625,8 @@ def run(ti: RuntimeTaskInstance, log: Logger):
finally:
if msg:
SUPERVISOR_COMMS.send_request(msg=msg, log=log)
# Return the message to make unit tests easier too
return msg


def _execute_task(context: Context, task: BaseOperator):
Expand Down
3 changes: 2 additions & 1 deletion task_sdk/src/airflow/sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ class DagRunProtocol(Protocol):

dag_id: str
run_id: str
logical_date: datetime
logical_date: datetime | None
data_interval_start: datetime | None
data_interval_end: datetime | None
start_date: datetime
end_date: datetime | None
run_type: Any
run_after: datetime
conf: dict[str, Any] | None
external_trigger: bool

Expand Down
36 changes: 36 additions & 0 deletions task_sdk/tests/execution_time/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import annotations

import contextlib
import json
import os
import uuid
Expand Down Expand Up @@ -1088,6 +1089,41 @@ def execute(self, context):
"a_simple_list": ["one", "two", "three", "actually one value is made per line"],
}

@pytest.mark.parametrize(
("logical_date", "check"),
(
pytest.param(None, pytest.raises(KeyError), id="no-logical-date"),
pytest.param(timezone.datetime(2024, 12, 3), contextlib.nullcontext(), id="with-logical-date"),
),
)
def test_no_logical_date_key_error(
self, mocked_parse, make_ti_context, mock_supervisor_comms, create_runtime_ti, logical_date, check
):
"""Test that a params can be retrieved from context."""

class CustomOperator(BaseOperator):
def execute(self, context):
for key in ("ds", "ds_nodash", "ts", "ts_nodash", "ts_nodash_with_tz"):
with check:
context[key]
# We should always be able to get this
assert context["task_instance_key_str"]

task = CustomOperator(task_id="print-params")
runtime_ti = create_runtime_ti(
dag_id="basic_param_dag",
logical_date=logical_date,
task=task,
conf={
"x": 3,
"text": "Hello World!",
"flag": False,
"a_simple_list": ["one", "two", "three", "actually one value is made per line"],
},
)
msg = run(runtime_ti, log=mock.MagicMock())
assert isinstance(msg, SucceedTask)


class TestXComAfterTaskExecution:
@pytest.mark.parametrize(
Expand Down