Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,13 @@
from typing import TYPE_CHECKING, ClassVar

from airflow.providers.amazon.aws.utils.suppress import return_on_error
from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS
from airflow.providers.amazon.version_compat import BaseOperatorLink, XCom

if TYPE_CHECKING:
from airflow.models import BaseOperator
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperatorLink
from airflow.sdk.execution_time.xcom import XCom
else:
from airflow.models import XCom # type: ignore[no-redef]
from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]


BASE_AWS_CONSOLE_LINK = "https://console.{aws_domain}"

Expand Down Expand Up @@ -94,8 +87,7 @@ def persist(
if not operator.do_xcom_push:
return

operator.xcom_push(
context,
context["ti"].xcom_push(
key=cls.key,
value={
"region_name": region_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

from collections.abc import Sequence

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.utils.mixins import (
AwsBaseHookMixin,
AwsHookParams,
AwsHookType,
aws_template_fields,
)
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.utils.types import NOTSET, ArgNotSet


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ def execute(self, context):
self._start_task()

if self.do_xcom_push:
self.xcom_push(context, key="ecs_task_arn", value=self.arn)
context["ti"].xcom_push(key="ecs_task_arn", value=self.arn)

if self.deferrable:
self.defer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def execute(self, context: Context) -> list[GetStatementResultResponseTypeDef] |
self.statement_id: str = query_execution_output.statement_id

if query_execution_output.session_id:
self.xcom_push(context, key="session_id", value=query_execution_output.session_id)
context["ti"].xcom_push(key="session_id", value=query_execution_output.session_id)

if self.deferrable and self.wait_for_completion:
is_finished: bool = self.hook.check_query_is_finished(self.statement_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.sagemaker_unified_studio import (
SageMakerNotebookHook,
)
Expand All @@ -34,6 +33,7 @@
from airflow.providers.amazon.aws.triggers.sagemaker_unified_studio import (
SageMakerNotebookJobTrigger,
)
from airflow.providers.amazon.version_compat import BaseOperator

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,7 @@
AwsHookType,
aws_template_fields,
)
from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

from airflow.providers.amazon.version_compat import BaseSensorOperator
from airflow.utils.types import NOTSET, ArgNotSet


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@
from airflow.providers.amazon.aws.hooks.sagemaker_unified_studio import (
SageMakerNotebookHook,
)
from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseSensorOperator
else:
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]
from airflow.providers.amazon.version_compat import BaseSensorOperator

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator

try:
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

from collections.abc import Sequence

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.utils.types import NOTSET, ArgNotSet


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.providers.exasol.hooks.exasol import ExasolHook

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.providers.ftp.hooks.ftp import FTPHook

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
from packaging.version import Version

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.glacier import GlacierHook
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING

from airflow.models import BaseOperator
from airflow.models.xcom import MAX_XCOM_SIZE, XCOM_RETURN_KEY
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.providers.google.common.hooks.discovery_api import GoogleDiscoveryApiHook

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from functools import cached_property
from typing import TYPE_CHECKING, Any

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.providers.http.hooks.http import HttpHook

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.providers.imap.hooks.imap import ImapHook

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

from bson import json_util

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.providers.mongo.hooks.mongo import MongoHook

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.utils.redshift import build_credentials_block
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.utils.types import NOTSET, ArgNotSet

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from botocore.exceptions import ClientError, WaiterError

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.providers.amazon.version_compat import BaseOperator

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.providers.ftp.hooks.ftp import FTPHook

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
from airflow.providers.amazon.aws.hooks.redshift_sql import RedshiftSQLHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.utils.redshift import build_credentials_block
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.utils.types import NOTSET, ArgNotSet

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from typing import TYPE_CHECKING
from urllib.parse import urlsplit

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.providers.ssh.hooks.ssh import SSHHook

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.providers.salesforce.hooks.salesforce import SalesforceHook

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from typing import TYPE_CHECKING
from urllib.parse import urlsplit

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator
from airflow.providers.ssh.hooks.ssh import SSHHook

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.version_compat import BaseOperator

if TYPE_CHECKING:
import pandas as pd
Expand Down
10 changes: 10 additions & 0 deletions providers/amazon/src/airflow/providers/amazon/version_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,13 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:


AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperator, BaseOperatorLink, BaseSensorOperator
from airflow.sdk.execution_time.xcom import XCom
else:
from airflow.models import BaseOperator, XCom # type: ignore[no-redef]
from airflow.models.baseoperatorlink import BaseOperatorLink # type: ignore[no-redef]
from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef]

__all__ = ["AIRFLOW_V_3_0_PLUS", "BaseOperator", "BaseOperatorLink", "BaseSensorOperator", "XCom"]
Loading