Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions task-sdk/src/airflow/sdk/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
from airflow.sdk.bases.operator import BaseOperator
from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator
from airflow.sdk.definitions._internal.node import validate_key
from airflow.sdk.definitions._internal.types import NOTSET
from airflow.sdk.definitions._internal.types import NOTSET, ArgNotSet
from airflow.sdk.definitions.asset import AssetAll, BaseAsset
from airflow.sdk.definitions.context import Context
from airflow.sdk.definitions.param import DagParam, ParamsDict
Expand Down Expand Up @@ -1014,7 +1014,7 @@ def _validate_owner_links(self, _, owner_links):
def test(
self,
run_after: datetime | None = None,
logical_date: datetime | None = None,
logical_date: datetime | None | ArgNotSet = NOTSET,
run_conf: dict[str, Any] | None = None,
conn_file_path: str | None = None,
variable_file_path: str | None = None,
Expand Down Expand Up @@ -1082,6 +1082,10 @@ def add_logger_if_needed(ti: TaskInstance):

with exit_stack:
self.validate()

# Allow users to explicitly pass None. If it isn't set, we default to current time.
logical_date = logical_date if not isinstance(logical_date, ArgNotSet) else timezone.utcnow()

log.debug("Clearing existing task instances for logical date %s", logical_date)
# TODO: Replace with calling client.dag_run.clear in Execution API at some point
SchedulerDAG.clear_dags(
Expand Down