Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2447,7 +2447,26 @@ scheduler:
type: boolean
example: ~
default: "True"
see_also: ":ref:`Differences between the two cron timetables`"
see_also: ':ref:`Differences between "trigger" and "data interval" timetables`'
create_delta_data_intervals:
description: |
Whether to create DAG runs that span an interval or one single point in time when a timedelta or
relativedelta is provided to ``schedule`` argument of a DAG.

* ``True``: **DeltaDataIntervalTimetable** is used, which is suitable for DAGs with well-defined data
interval. You get contiguous intervals from the end of the previous interval up to the scheduled
datetime.
* ``False``: **DeltaTriggerTimetable** is used, which is suitable for DAGs that simply want to say
e.g. "run this every day" and do not care about the data interval.

Notably, for **DeltaTriggerTimetable**, the logical date is the same as the time the DAG Run will
try to schedule, while for **DeltaDataIntervalTimetable**, the logical date is the beginning of
the data interval, but the DAG Run will try to schedule at the end of the data interval.
version_added: 2.11.0
type: boolean
example: ~
default: "True"
see_also: ':ref:`Differences between "trigger" and "data interval" timetables`'
enable_tracemalloc:
description: |
Whether to enable memory allocation tracing in the scheduler. If enabled, Airflow will start
Expand Down
56 changes: 56 additions & 0 deletions airflow/timetables/_delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import datetime
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowTimetableInvalid
from airflow.utils.timezone import convert_to_utc

if TYPE_CHECKING:
from dateutil.relativedelta import relativedelta
from pendulum import DateTime


class DeltaMixin:
"""Mixin to provide interface to work with timedelta and relativedelta."""

def __init__(self, delta: datetime.timedelta | relativedelta) -> None:
self._delta = delta

@property
def summary(self) -> str:
return str(self._delta)

def validate(self) -> None:
now = datetime.datetime.now()
if (now + self._delta) <= now:
raise AirflowTimetableInvalid(f"schedule interval must be positive, not {self._delta!r}")

def _get_next(self, current: DateTime) -> DateTime:
return convert_to_utc(current + self._delta)

def _get_prev(self, current: DateTime) -> DateTime:
return convert_to_utc(current - self._delta)

def _align_to_next(self, current: DateTime) -> DateTime:
return current

def _align_to_prev(self, current: DateTime) -> DateTime:
return current
30 changes: 3 additions & 27 deletions airflow/timetables/interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
from dateutil.relativedelta import relativedelta
from pendulum import DateTime

from airflow.exceptions import AirflowTimetableInvalid
from airflow.timetables._cron import CronMixin
from airflow.timetables._delta import DeltaMixin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from airflow.utils.timezone import coerce_datetime, convert_to_utc, utcnow
from airflow.utils.timezone import coerce_datetime, utcnow

if TYPE_CHECKING:
from airflow.timetables.base import TimeRestriction
Expand Down Expand Up @@ -173,7 +173,7 @@ def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
return DataInterval(start=self._get_prev(end), end=end)


class DeltaDataIntervalTimetable(_DataIntervalTimetable):
class DeltaDataIntervalTimetable(DeltaMixin, _DataIntervalTimetable):
"""
Timetable that schedules data intervals with a time delta.

Expand All @@ -182,9 +182,6 @@ class DeltaDataIntervalTimetable(_DataIntervalTimetable):
instance.
"""

def __init__(self, delta: Delta) -> None:
self._delta = delta

@classmethod
def deserialize(cls, data: dict[str, Any]) -> Timetable:
from airflow.serialization.serialized_objects import decode_relativedelta
Expand All @@ -204,10 +201,6 @@ def __eq__(self, other: Any) -> bool:
return NotImplemented
return self._delta == other._delta

@property
def summary(self) -> str:
return str(self._delta)

def serialize(self) -> dict[str, Any]:
from airflow.serialization.serialized_objects import encode_relativedelta

Expand All @@ -218,23 +211,6 @@ def serialize(self) -> dict[str, Any]:
delta = encode_relativedelta(self._delta)
return {"delta": delta}

def validate(self) -> None:
now = datetime.datetime.now()
if (now + self._delta) <= now:
raise AirflowTimetableInvalid(f"schedule interval must be positive, not {self._delta!r}")

def _get_next(self, current: DateTime) -> DateTime:
return convert_to_utc(current + self._delta)

def _get_prev(self, current: DateTime) -> DateTime:
return convert_to_utc(current - self._delta)

def _align_to_next(self, current: DateTime) -> DateTime:
return current

def _align_to_prev(self, current: DateTime) -> DateTime:
return current

@staticmethod
def _relativedelta_in_seconds(delta: relativedelta) -> int:
return (
Expand Down
172 changes: 124 additions & 48 deletions airflow/timetables/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from typing import TYPE_CHECKING, Any

from airflow.timetables._cron import CronMixin
from airflow.timetables._delta import DeltaMixin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from airflow.utils.timezone import coerce_datetime, utcnow

Expand Down Expand Up @@ -61,7 +62,122 @@ def _deserialize_run_immediately(value: bool | float) -> bool | datetime.timedel
return value


class CronTriggerTimetable(CronMixin, Timetable):
class _TriggerTimetable(Timetable):
_interval: datetime.timedelta | relativedelta

def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
return DataInterval(
coerce_datetime(run_after - self._interval),
run_after,
)

def _calc_first_run(self) -> DateTime:
"""
If no start_time is set, determine the start.

If True, always prefer past run, if False, never. If None, if within 10% of next run,
if timedelta, if within that timedelta from past run.
"""
raise NotImplementedError()

def _align_to_next(self, current: DateTime) -> DateTime:
raise NotImplementedError()

def _align_to_prev(self, current: DateTime) -> DateTime:
raise NotImplementedError()

def _get_next(self, current: DateTime) -> DateTime:
raise NotImplementedError()

def _get_prev(self, current: DateTime) -> DateTime:
raise NotImplementedError()

def next_dagrun_info(
self,
*,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
if restriction.catchup:
if last_automated_data_interval is not None:
next_start_time = self._get_next(last_automated_data_interval.end)
elif restriction.earliest is None:
next_start_time = self._calc_first_run()
else:
next_start_time = self._align_to_next(restriction.earliest)
else:
start_time_candidates = [self._align_to_prev(coerce_datetime(utcnow()))]
if last_automated_data_interval is not None:
start_time_candidates.append(self._get_next(last_automated_data_interval.end))
elif restriction.earliest is None:
# Run immediately has no effect if there is restriction on earliest
start_time_candidates.append(self._calc_first_run())
if restriction.earliest is not None:
start_time_candidates.append(self._align_to_next(restriction.earliest))
next_start_time = max(start_time_candidates)
if restriction.latest is not None and restriction.latest < next_start_time:
return None
return DagRunInfo.interval(
coerce_datetime(next_start_time - self._interval),
next_start_time,
)


class DeltaTriggerTimetable(DeltaMixin, _TriggerTimetable):
"""
Timetable that triggers DAG runs according to a cron expression.

This is different from ``DeltaDataIntervalTimetable``, where the delta value
specifies the *data interval* of a DAG run. With this timetable, the data
intervals are specified independently. Also for the same reason, this
timetable kicks off a DAG run immediately at the start of the period,
instead of needing to wait for one data interval to pass.

:param delta: How much time to wait between each run.
:param interval: The data interval of each run. Default is 0.

*run_immediately* controls, if no *start_time* is given to the DAG, when
the first run of the DAG should be scheduled. It has no effect if there
already exist runs for this DAG.

* If *True*, always run immediately the most recent possible DAG run.
* If *False*, wait to run until the next scheduled time in the future.
* If passed a ``timedelta``, will run the most recent possible DAG run
if that run's ``data_interval_end`` is within timedelta of now.
* If *None*, the timedelta is calculated as 10% of the time between the
most recent past scheduled time and the next scheduled time. E.g. if
running every hour, this would run the previous time if less than 6
minutes had past since the previous run time, otherwise it would wait
until the next hour.
"""

def __init__(
self,
delta: datetime.timedelta | relativedelta,
*,
interval: datetime.timedelta | relativedelta = datetime.timedelta(),
) -> None:
super().__init__(delta)
self._interval = interval

@classmethod
def deserialize(cls, data: dict[str, Any]) -> Timetable:
return cls(
_deserialize_interval(data["delta"]),
interval=_deserialize_interval(data["interval"]),
)

def serialize(self) -> dict[str, Any]:
return {
"delta": _serialize_interval(self._delta),
"interval": _serialize_interval(self._interval),
}

def _calc_first_run(self) -> DateTime:
return self._align_to_prev(coerce_datetime(utcnow()))


class CronTriggerTimetable(CronMixin, _TriggerTimetable):
"""
Timetable that triggers DAG runs according to a cron expression.

Expand Down Expand Up @@ -103,7 +219,7 @@ def __init__(
) -> None:
super().__init__(cron, timezone)
self._interval = interval
self.run_immediately = run_immediately
self._run_immediately = run_immediately

@classmethod
def deserialize(cls, data: dict[str, Any]) -> Timetable:
Expand All @@ -123,50 +239,10 @@ def serialize(self) -> dict[str, Any]:
"expression": self._expression,
"timezone": encode_timezone(self._timezone),
"interval": _serialize_interval(self._interval),
"run_immediately": _serialize_run_immediately(self.run_immediately),
"run_immediately": _serialize_run_immediately(self._run_immediately),
}

def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
return DataInterval(
# pendulum.Datetime ± timedelta should return pendulum.Datetime
# however mypy decide that output would be datetime.datetime
run_after - self._interval, # type: ignore[arg-type]
run_after,
)

def next_dagrun_info(
self,
*,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
if restriction.catchup:
if last_automated_data_interval is not None:
next_start_time = self._get_next(last_automated_data_interval.end)
elif restriction.earliest is None:
next_start_time = self._calc_first_run()
else:
next_start_time = self._align_to_next(restriction.earliest)
else:
start_time_candidates = [self._align_to_prev(coerce_datetime(utcnow()))]
if last_automated_data_interval is not None:
start_time_candidates.append(self._get_next(last_automated_data_interval.end))
elif restriction.earliest is None:
# Run immediately has no effect if there is restriction on earliest
start_time_candidates.append(self._calc_first_run())
if restriction.earliest is not None:
start_time_candidates.append(self._align_to_next(restriction.earliest))
next_start_time = max(start_time_candidates)
if restriction.latest is not None and restriction.latest < next_start_time:
return None
return DagRunInfo.interval(
# pendulum.Datetime ± timedelta should return pendulum.Datetime
# however mypy decide that output would be datetime.datetime
next_start_time - self._interval, # type: ignore[arg-type]
next_start_time,
)

def _calc_first_run(self):
def _calc_first_run(self) -> DateTime:
"""
If no start_time is set, determine the start.

Expand All @@ -176,13 +252,13 @@ def _calc_first_run(self):
now = coerce_datetime(utcnow())
past_run_time = self._align_to_prev(now)
next_run_time = self._align_to_next(now)
if self.run_immediately is True: # not truthy, actually set to True
if self._run_immediately is True: # Check for 'True' exactly because deltas also evaluate to true.
return past_run_time

gap_between_runs = next_run_time - past_run_time
gap_to_past = now - past_run_time
if isinstance(self.run_immediately, datetime.timedelta):
buffer_between_runs = self.run_immediately
if isinstance(self._run_immediately, datetime.timedelta):
buffer_between_runs = self._run_immediately
else:
buffer_between_runs = max(gap_between_runs / 10, datetime.timedelta(minutes=5))
if gap_to_past <= buffer_between_runs:
Expand Down Expand Up @@ -238,7 +314,7 @@ def serialize(self) -> dict[str, Any]:
"expressions": [t._expression for t in self._timetables],
"timezone": encode_timezone(timetable._timezone),
"interval": _serialize_interval(timetable._interval),
"run_immediately": _serialize_run_immediately(timetable.run_immediately),
"run_immediately": _serialize_run_immediately(timetable._run_immediately),
}

@property
Expand Down
Loading
Loading