Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion providers/alibaba/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ requires-python = ">=3.10"
# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build``
dependencies = [
"apache-airflow>=2.11.0",
"apache-airflow-providers-common-compat>=1.12.0",
"apache-airflow-providers-common-compat>=1.12.0", # use next version
"oss2>=2.14.0",
"alibabacloud_adb20211201>=1.0.0",
"alibabacloud_tea_openapi>=0.3.7",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
from airflow.providers.common.compat.sdk import BaseOperatorLink, XCom

if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import BaseOperator
from airflow.providers.common.compat.sdk import BaseOperator, TaskInstanceKey
from airflow.sdk import Context


Expand Down
2 changes: 1 addition & 1 deletion providers/amazon/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ requires-python = ">=3.10"
# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build``
dependencies = [
"apache-airflow>=2.11.0",
"apache-airflow-providers-common-compat>=1.12.0",
"apache-airflow-providers-common-compat>=1.12.0", # use next version
"apache-airflow-providers-common-sql>=1.27.0",
"apache-airflow-providers-http",
# We should update minimum version of boto3 and here regularly to avoid `pip` backtracking with the number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

if TYPE_CHECKING:
from airflow.models import BaseOperator
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import TaskInstanceKey
from airflow.sdk import Context


Expand Down
2 changes: 1 addition & 1 deletion providers/celery/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ requires-python = ">=3.10"
# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build``
dependencies = [
"apache-airflow>=2.11.0",
"apache-airflow-providers-common-compat>=1.12.0",
"apache-airflow-providers-common-compat>=1.12.0", # use next version
# The Celery is known to introduce problems when upgraded to a MAJOR version. Airflow Core
# Uses Celery for CeleryExecutor, and we also know that Kubernetes Python client follows SemVer
# (https://docs.celeryq.dev/en/stable/contributing.html?highlight=semver#versions).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@
from airflow.executors import workloads
from airflow.models.dag import DAG
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import AirflowException, AirflowTaskTimeout
from airflow.providers.common.compat.sdk import AirflowException, AirflowTaskTimeout, TaskInstanceKey
from airflow.providers.standard.operators.bash import BashOperator
from airflow.utils.state import State

Expand Down
2 changes: 1 addition & 1 deletion providers/cncf/kubernetes/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ requires-python = ">=3.10"
dependencies = [
"aiofiles>=23.2.0",
"apache-airflow>=2.11.0",
"apache-airflow-providers-common-compat>=1.10.1",
"apache-airflow-providers-common-compat>=1.10.1", # use next version
"asgiref>=3.5.2",
# TODO(potiuk): We should bump cryptography to >=46.0.0 when sqlalchemy>=2.0 is required
"cryptography>=41.0.0,<46.0.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey:

# Compat: Look up the run_id from the TI table!
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.settings import Session

logical_date_key = get_logical_date_key()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
)
from airflow.sdk.execution_time.timeout import timeout as timeout
from airflow.sdk.execution_time.xcom import XCom as XCom
from airflow.sdk.types import TaskInstanceKey as TaskInstanceKey


from airflow.providers.common.compat._compat_utils import create_module_getattr
Expand Down Expand Up @@ -185,6 +186,7 @@
# Operator Links & Task Groups
# ============================================================================
"BaseOperatorLink": ("airflow.sdk", "airflow.models.baseoperatorlink"),
"TaskInstanceKey": ("airflow.sdk.types", "airflow.models.taskinstancekey"),
"TaskGroup": ("airflow.sdk", "airflow.utils.task_group"),
# ============================================================================
# Operator Utilities (chain, cross_downstream, etc.)
Expand Down
2 changes: 1 addition & 1 deletion providers/databricks/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ requires-python = ">=3.10"
# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build``
dependencies = [
"apache-airflow>=2.11.0",
"apache-airflow-providers-common-compat>=1.12.0",
"apache-airflow-providers-common-compat>=1.12.0", # use next version
"apache-airflow-providers-common-sql>=1.27.0",
"requests>=2.32.0,<3",
"databricks-sql-connector>=4.0.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from airflow.providers.databricks.version_compat import AIRFLOW_V_3_0_PLUS

if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import TaskInstanceKey
from airflow.providers.databricks.operators.databricks_workflow import (
DatabricksWorkflowTaskGroup,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def xcom_key(self) -> str:
"""XCom key where the link is stored during task execution."""
return "databricks_job_run_link"

def get_link(
def get_link( # type: ignore[override] # Signature intentionally kept this way for Airflow 2.x compatibility
self,
operator: BaseOperator,
dttm=None,
Expand Down Expand Up @@ -374,7 +374,7 @@ class WorkflowJobRepairAllFailedLink(BaseOperatorLink, LoggingMixin):

name = "Repair All Failed Tasks"

def get_link(
def get_link( # type: ignore[override] # Signature intentionally kept this way for Airflow 2.x compatibility
self,
operator,
dttm=None,
Expand Down Expand Up @@ -471,7 +471,7 @@ class WorkflowJobRepairSingleTaskLink(BaseOperatorLink, LoggingMixin):

name = "Repair a single task"

def get_link(
def get_link( # type: ignore[override] # Signature intentionally kept this way for Airflow 2.x compatibility
self,
operator,
dttm=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
)

if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import TaskInstanceKey
from airflow.providers.edge3.models.edge_worker import EdgeWorkerState
from airflow.utils.state import TaskInstanceState

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
from sqlalchemy.orm import Mapped

from airflow.models.base import Base, StringID
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import timezone
from airflow.providers.common.compat.sdk import TaskInstanceKey, timezone
from airflow.providers.common.compat.sqlalchemy.orm import mapped_column
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.sqlalchemy import UtcDateTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from pydantic import BaseModel, Field

from airflow.executors.workloads import ExecuteTask # noqa: TCH001
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import TaskInstanceKey
from airflow.providers.edge3.models.edge_worker import EdgeWorkerState # noqa: TCH001


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.configuration import conf
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import TaskInstanceKey
from airflow.providers.edge3.models.edge_logs import EdgeLogsModel
from airflow.providers.edge3.worker_api.auth import jwt_token_authorization_rest
from airflow.providers.edge3.worker_api.datamodels import PushLogsBody, WorkerApiDocs
Expand Down
2 changes: 1 addition & 1 deletion providers/google/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ requires-python = ">=3.10"
# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build``
dependencies = [
"apache-airflow>=2.11.0",
"apache-airflow-providers-common-compat>=1.12.0",
"apache-airflow-providers-common-compat>=1.12.0", # use next version
"apache-airflow-providers-common-sql>=1.27.0",
"asgiref>=3.5.2",
"dill>=0.2.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS, BaseOperator

if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import Context
from airflow.providers.common.compat.sdk import Context, TaskInstanceKey
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator

BASE_LINK = "https://console.cloud.google.com"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
from airflow.providers.google.cloud.links.base import BASE_LINK, BaseGoogleLink

if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import Context
from airflow.providers.common.compat.sdk import Context, TaskInstanceKey
from airflow.providers.google.version_compat import BaseOperator


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@
if TYPE_CHECKING:
from google.protobuf.field_mask_pb2 import FieldMask

from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import Context
from airflow.providers.common.compat.sdk import Context, TaskInstanceKey
from airflow.providers.google.version_compat import BaseOperator

BASE_LINK = "https://console.cloud.google.com"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
from airflow.providers.common.compat.sdk import BaseOperatorLink, XCom

if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import Context
from airflow.providers.common.compat.sdk import Context, TaskInstanceKey
from airflow.providers.google.version_compat import BaseOperator


Expand Down
2 changes: 1 addition & 1 deletion providers/microsoft/azure/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ requires-python = ">=3.10"
# After you modify the dependencies, and rebuild your Breeze CI image with ``breeze ci-image build``
dependencies = [
"apache-airflow>=2.11.0",
"apache-airflow-providers-common-compat>=1.12.0",
"apache-airflow-providers-common-compat>=1.12.0", # use next version
"adlfs>=2023.10.0",
"azure-batch>=8.0.0",
"azure-cosmos>=4.6.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import TaskInstanceKey
from airflow.sdk import Context


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
if TYPE_CHECKING:
from msgraph_core import APIVersion

from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import TaskInstanceKey
from airflow.sdk import Context


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
if TYPE_CHECKING:
from azure.synapse.spark.models import SparkBatchJobOptions

from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import TaskInstanceKey
from airflow.sdk import Context


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@
if TYPE_CHECKING:
from sqlalchemy.orm.session import Session

from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import Context
from airflow.providers.common.compat.sdk import Context, TaskInstanceKey


class DagIsPaused(AirflowException):
Expand Down Expand Up @@ -89,8 +88,17 @@ def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
trigger_dag_id = operator.trigger_dag_id
if not AIRFLOW_V_3_0_PLUS:
from airflow.models.renderedtifields import RenderedTaskInstanceFields
from airflow.models.taskinstancekey import TaskInstanceKey as CoreTaskInstanceKey

core_ti_key = CoreTaskInstanceKey(
dag_id=ti_key.dag_id,
task_id=ti_key.task_id,
run_id=ti_key.run_id,
try_number=ti_key.try_number,
map_index=ti_key.map_index,
)

if template_fields := RenderedTaskInstanceFields.get_templated_fields(ti_key):
if template_fields := RenderedTaskInstanceFields.get_templated_fields(core_ti_key):
trigger_dag_id: str = template_fields.get("trigger_dag_id", operator.trigger_dag_id) # type: ignore[no-redef]

# Fetch the correct dag_run_id for the triggerED dag which is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@
if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import Context
from airflow.providers.common.compat.sdk import Context, TaskInstanceKey


class ExternalDagLink(BaseOperatorLink):
Expand All @@ -83,8 +82,17 @@ def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:

if not AIRFLOW_V_3_0_PLUS:
from airflow.models.renderedtifields import RenderedTaskInstanceFields
from airflow.models.taskinstancekey import TaskInstanceKey as CoreTaskInstanceKey

core_ti_key = CoreTaskInstanceKey(
dag_id=ti_key.dag_id,
task_id=ti_key.task_id,
run_id=ti_key.run_id,
try_number=ti_key.try_number,
map_index=ti_key.map_index,
)

if template_fields := RenderedTaskInstanceFields.get_templated_fields(ti_key):
if template_fields := RenderedTaskInstanceFields.get_templated_fields(core_ti_key):
external_dag_id: str = template_fields.get("external_dag_id", operator.external_dag_id) # type: ignore[no-redef]

if AIRFLOW_V_3_0_PLUS:
Expand Down
2 changes: 1 addition & 1 deletion providers/yandex/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ dependencies = [
"apache-airflow>=2.11.0",
"yandexcloud>=0.308.0; python_version < '3.13'",
"yandex-query-client>=0.1.4; python_version < '3.13'",
"apache-airflow-providers-common-compat>=1.12.0",
"apache-airflow-providers-common-compat>=1.12.0", # use next version
]

[dependency-groups]
Expand Down
3 changes: 1 addition & 2 deletions providers/yandex/src/airflow/providers/yandex/links/yq.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
from airflow.providers.common.compat.sdk import BaseOperatorLink, XCom

if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.common.compat.sdk import BaseOperator, Context
from airflow.providers.common.compat.sdk import BaseOperator, Context, TaskInstanceKey

XCOM_WEBLINK_KEY = "web_link"

Expand Down
2 changes: 1 addition & 1 deletion task-sdk/src/airflow/sdk/bases/operatorlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import attrs

if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.sdk import BaseOperator
from airflow.sdk.types import TaskInstanceKey


@attrs.define()
Expand Down
35 changes: 34 additions & 1 deletion task-sdk/src/airflow/sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import uuid
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, Protocol, TypeAlias
from typing import TYPE_CHECKING, Any, NamedTuple, Protocol, TypeAlias

from airflow.sdk.bases.xcom import BaseXCom
from airflow.sdk.definitions._internal.types import NOTSET, ArgNotSet
Expand All @@ -39,6 +39,39 @@
Operator: TypeAlias = BaseOperator | MappedOperator


class TaskInstanceKey(NamedTuple):
"""Key used to identify task instance."""

dag_id: str
task_id: str
run_id: str
try_number: int = 1
map_index: int = -1

@property
def primary(self) -> tuple[str, str, str, int]:
"""Return task instance primary key part of the key."""
return self.dag_id, self.task_id, self.run_id, self.map_index

def with_try_number(self, try_number: int) -> TaskInstanceKey:
"""Return TaskInstanceKey with provided ``try_number``."""
return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, try_number, self.map_index)

@property
def key(self) -> TaskInstanceKey:
"""
For API-compatibility with TaskInstance.

Returns self
"""
return self

@classmethod
def from_dict(cls, dictionary):
"""Create TaskInstanceKey from dictionary."""
return cls(**dictionary)


class DagRunProtocol(Protocol):
"""Minimal interface for a Dag run available during the execution."""

Expand Down
Loading