Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions airflow/providers/amazon/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,103 @@ Main
Breaking changes
~~~~~~~~~~~~~~~~

.. warning::
All deprecated classes, parameters and features have been removed from the Amazon provider package.
The following breaking changes were introduced:

* Hooks

* Removed ``sleep_time`` parameter from ``AthenaHook``. Use ``poll_query_status`` instead
* Removed ``BaseAsyncSessionFactory``
* Removed ``AwsBaseAsyncHook``
* Removed ``start_from_head`` parameter from ``AwsLogsHook.get_log_events`` method
* Removed ``sts_hook`` property from ``QuickSightHook``
* Removed ``RedshiftAsyncHook``
* Removed S3 connection type. Please use ``aws`` as ``conn_type`` instead, and specify ``bucket_name`` in ``service_config.s3`` within ``extras``
* Removed ``wait_for_completion``, ``check_interval`` and ``verbose`` parameters from ``SageMakerHook.start_pipeline`` method
* Removed ``wait_for_completion``, ``check_interval`` and ``verbose`` parameters from ``SageMakerHook.stop_pipeline`` method

* Operators

* Removed ``source`` parameter from ``AppflowRunOperator``
* Removed ``overrides`` parameter from ``BatchOperator``. Use ``container_overrides`` instead
* Removed ``status_retries`` parameter from ``BatchCreateComputeEnvironmentOperator``
* Removed ``get_hook`` method from ``DataSyncOperator``. Use ``hook`` property instead
* Removed ``wait_for_completion``, ``waiter_delay`` and ``waiter_max_attempts`` parameters from ``EcsDeregisterTaskDefinitionOperator``. Please use ``waiter_max_attempts`` and ``waiter_delay`` instead
* Removed ``wait_for_completion``, ``waiter_delay`` and ``waiter_max_attempts`` parameters from ``EcsRegisterTaskDefinitionOperator``. Please use ``waiter_max_attempts`` and ``waiter_delay`` instead
* Removed ``eks_hook`` property from ``EksCreateClusterOperator``. Use ``hook`` property instead
* Removed ``pod_context``, ``pod_username`` and ``is_delete_operator_pod`` parameters from ``EksPodOperator``
* Removed ``waiter_countdown`` and ``waiter_check_interval_seconds`` parameters from ``EmrStartNotebookExecutionOperator``. Please use ``waiter_max_attempts`` and ``waiter_delay`` instead
* Removed ``waiter_countdown`` and ``waiter_check_interval_seconds`` parameters from ``EmrStopNotebookExecutionOperator``. Please use ``waiter_max_attempts`` and ``waiter_delay`` instead
* Removed ``max_tries`` parameter from ``EmrContainerOperator``. Use ``max_polling_attempts`` instead
* Removed ``waiter_countdown`` and ``waiter_check_interval_seconds`` parameters from ``EmrCreateJobFlowOperator``. Please use ``waiter_max_attempts`` and ``waiter_delay`` instead
* Removed ``waiter_countdown`` and ``waiter_check_interval_seconds`` parameters from ``EmrServerlessCreateApplicationOperator``. Please use ``waiter_max_attempts`` and ``waiter_delay`` instead
* Removed ``waiter_countdown`` and ``waiter_check_interval_seconds`` parameters from ``EmrServerlessStartJobOperator``. Please use ``waiter_max_attempts`` and ``waiter_delay`` instead
* Removed ``waiter_countdown`` and ``waiter_check_interval_seconds`` parameters from ``EmrServerlessStopApplicationOperator``. Please use ``waiter_max_attempts`` and ``waiter_delay`` instead
* Removed ``waiter_countdown`` and ``waiter_check_interval_seconds`` parameters from ``EmrServerlessDeleteApplicationOperator``. Please use ``waiter_max_attempts`` and ``waiter_delay`` instead
* Removed ``delay`` parameter from ``GlueDataBrewStartJobOperator``. Use ``waiter_delay`` instead
* Removed ``hook_params`` parameter from ``RdsBaseOperator``
* Removed ``increment`` as possible value from ``action_if_job_exists`` parameter from ``SageMakerProcessingOperator``
* Removed ``increment`` as possible value from ``action_if_job_exists`` parameter from ``SageMakerTransformOperator``
* Removed ``increment`` as possible value from ``action_if_job_exists`` parameter from ``SageMakerTrainingOperator``

* Secrets

* Removed from ``full_url_mode`` and ``are_secret_values_urlencoded`` as possible key in ``kwargs`` from ``SecretsManagerBackend``

* Sensors

* Removed ``get_hook`` method from ``BatchSensor``. Use ``hook`` property instead
* Removed ``get_hook`` method from ``DmsTaskBaseSensor``. Use ``hook`` property instead
* Removed ``get_hook`` method from ``EmrBaseSensor``. Use ``hook`` property instead
* Removed ``get_hook`` method from ``GlueCatalogPartitionSensor``. Use ``hook`` property instead
* Removed ``get_hook`` method from ``GlueCrawlerSensor``. Use ``hook`` property instead
* Removed ``quicksight_hook`` property from ``QuickSightSensor``. Use ``QuickSightSensor.hook`` instead
* Removed ``sts_hook`` property from ``QuickSightSensor``
* Removed ``get_hook`` method from ``RedshiftClusterSensor``. Use ``hook`` property instead
* Removed ``get_hook`` method from ``S3KeySensor``. Use ``hook`` property instead
* Removed ``get_hook`` method from ``SageMakerBaseSensor``. Use ``hook`` property instead
* Removed ``get_hook`` method from ``SqsSensor``. Use ``hook`` property instead
* Removed ``get_hook`` method from ``StepFunctionExecutionSensor``. Use ``hook`` property instead

* Transfers

* Removed ``aws_conn_id`` parameter from ``AwsToAwsBaseOperator``. Use ``source_aws_conn_id`` instead
* Removed ``bucket`` and ``delimiter`` parameters from ``GCSToS3Operator``. Use ``gcs_bucket`` instead of ``bucket``

* Triggers

* Removed ``BatchOperatorTrigger``. Use ``BatchJobTrigger`` instead
* Removed ``BatchSensorTrigger``. Use ``BatchJobTrigger`` instead
* Removed ``region`` parameter from ``EksCreateFargateProfileTrigger``. Use ``region_name`` instead
* Removed ``region`` parameter from ``EksDeleteFargateProfileTrigger``. Use ``region_name`` instead
* Removed ``poll_interval`` and ``max_attempts`` parameters from ``EmrCreateJobFlowTrigger``. Use ``waiter_delay`` and ``waiter_max_attempts`` instead
* Removed ``poll_interval`` and ``max_attempts`` parameters from ``EmrTerminateJobFlowTrigger``. Use ``waiter_delay`` and ``waiter_max_attempts`` instead
* Removed ``poll_interval`` parameter from ``EmrContainerTrigger``. Use ``waiter_delay`` instead
* Removed ``poll_interval`` parameter from ``GlueCrawlerCompleteTrigger``. Use ``waiter_delay`` instead
* Removed ``delay`` and ``max_attempts`` parameters from ``GlueDataBrewJobCompleteTrigger``. Use ``waiter_delay`` and ``waiter_max_attempts`` instead
* Removed ``RdsDbInstanceTrigger``. Use the other RDS triggers such as ``RdsDbDeletedTrigger``, ``RdsDbStoppedTrigger`` or ``RdsDbAvailableTrigger``
* Removed ``poll_interval`` and ``max_attempts`` parameters from ``RedshiftCreateClusterTrigger``. Use ``waiter_delay`` and ``waiter_max_attempts`` instead
* Removed ``poll_interval`` and ``max_attempts`` parameters from ``RedshiftPauseClusterTrigger``. Use ``waiter_delay`` and ``waiter_max_attempts`` instead
* Removed ``poll_interval`` and ``max_attempts`` parameters from ``RedshiftCreateClusterSnapshotTrigger``. Use ``waiter_delay`` and ``waiter_max_attempts`` instead
* Removed ``poll_interval`` and ``max_attempts`` parameters from ``RedshiftResumeClusterTrigger``. Use ``waiter_delay`` and ``waiter_max_attempts`` instead
* Removed ``poll_interval`` and ``max_attempts`` parameters from ``RedshiftDeleteClusterTrigger``. Use ``waiter_delay`` and ``waiter_max_attempts`` instead
* Removed ``SageMakerTrainingPrintLogTrigger``. Use ``SageMakerTrigger`` instead

* Utils

* Removed ``test_endpoint_url`` as possible key in ``extra_config`` from ``AwsConnectionWrapper``. Please set ``endpoint_url`` in ``service_config.sts`` within ``extras``
* Removed ``s3`` as possible value in ``conn_type`` from ``AwsConnectionWrapper``. Please update your connection to have ``conn_type='aws'``
* Removed ``session_kwargs`` as key in connection extra config. Please specify arguments passed to boto3 session directly
* Removed ``host`` from AWS connection, please set it in ``extra['endpoint_url']`` instead
* Removed ``region`` parameter from ``AwsHookParams``. Use ``region_name`` instead

.. warning::
In order to support session reuse in RedshiftData operators, the following breaking changes were introduced:

The ``database`` argument is now optional and as a result was moved after the ``sql`` argument which is a positional
one. Update your DAGs accordingly if they rely on argument order. Applies to:

* ``RedshiftDataHook``'s ``execute_query`` method
* ``RedshiftDataOperator``

Expand Down
20 changes: 3 additions & 17 deletions airflow/providers/amazon/aws/hooks/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Any, Collection

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.utils.waiter_with_logging import wait

Expand Down Expand Up @@ -56,7 +55,6 @@ class AthenaHook(AwsBaseHook):
Provide thick wrapper around
:external+boto3:py:class:`boto3.client("athena") <Athena.Client>`.

:param sleep_time: obsolete, please use the parameter of `poll_query_status` method instead
:param log_query: Whether to log athena query and other execution params
when it's executed. Defaults to *True*.

Expand All @@ -82,20 +80,8 @@ class AthenaHook(AwsBaseHook):
"CANCELLED",
)

def __init__(
self, *args: Any, sleep_time: int | None = None, log_query: bool = True, **kwargs: Any
) -> None:
def __init__(self, *args: Any, log_query: bool = True, **kwargs: Any) -> None:
super().__init__(client_type="athena", *args, **kwargs) # type: ignore
if sleep_time is not None:
self.sleep_time = sleep_time
warnings.warn(
"The `sleep_time` parameter of the Athena hook is deprecated, "
"please pass this parameter to the poll_query_status method instead.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
else:
self.sleep_time = 30 # previous default value
self.log_query = log_query
self.__query_results: dict[str, Any] = {}

Expand Down Expand Up @@ -291,7 +277,7 @@ def poll_query_status(
try:
wait(
waiter=self.get_waiter("query_complete"),
waiter_delay=self.sleep_time if sleep_time is None else sleep_time,
waiter_delay=30 if sleep_time is None else sleep_time,
waiter_max_attempts=max_polling_attempts or 120,
args={"QueryExecutionId": query_execution_id},
failure_message=f"Error while waiting for query {query_execution_id} to complete",
Expand Down
166 changes: 4 additions & 162 deletions airflow/providers/amazon/aws/hooks/base_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,12 @@
from botocore.config import Config
from botocore.waiter import Waiter, WaiterModel
from dateutil.tz import tzlocal
from deprecated import deprecated
from slugify import slugify

from airflow.configuration import conf
from airflow.exceptions import (
AirflowException,
AirflowNotFoundException,
AirflowProviderDeprecationWarning,
)
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper
Expand All @@ -65,10 +63,11 @@
BaseAwsConnection = TypeVar("BaseAwsConnection", bound=Union[boto3.client, boto3.resource])

if TYPE_CHECKING:
from aiobotocore.session import AioSession
from botocore.client import ClientMeta
from botocore.credentials import ReadOnlyCredentials

from airflow.models.connection import Connection # Avoid circular imports.
from airflow.models.connection import Connection

_loader = botocore.loaders.Loader()
"""
Expand Down Expand Up @@ -172,9 +171,7 @@ def get_async_session(self):
session.register_component("data_loader", _loader)
return session

def create_session(
self, deferrable: bool = False
) -> boto3.session.Session | aiobotocore.session.AioSession:
def create_session(self, deferrable: bool = False) -> boto3.session.Session | AioSession:
"""Create boto3 or aiobotocore Session from connection config."""
if not self.conn:
self.log.info(
Expand Down Expand Up @@ -216,7 +213,7 @@ def _create_basic_session(self, session_kwargs: dict[str, Any]) -> boto3.session

def _create_session_with_assume_role(
self, session_kwargs: dict[str, Any], deferrable: bool = False
) -> boto3.session.Session | aiobotocore.session.AioSession:
) -> boto3.session.Session | AioSession:
if self.conn.assume_role_method == "assume_role_with_web_identity":
# Deferred credentials have no initial credentials
credential_fetcher = self._get_web_identity_credential_fetcher()
Expand Down Expand Up @@ -1029,158 +1026,3 @@ def resolve_session_factory() -> type[BaseSessionFactory]:


SessionFactory = resolve_session_factory()


def _parse_s3_config(config_file_name: str, config_format: str | None = "boto", profile: str | None = None):
"""For compatibility with airflow.contrib.hooks.aws_hook."""
from airflow.providers.amazon.aws.utils.connection_wrapper import _parse_s3_config

return _parse_s3_config(
config_file_name=config_file_name,
config_format=config_format,
profile=profile,
)


try:
import aiobotocore.credentials
from aiobotocore.session import AioSession, get_session
except ImportError:
pass


@deprecated(
reason=(
"`airflow.providers.amazon.aws.hook.base_aws.BaseAsyncSessionFactory` "
"has been deprecated and will be removed in future"
),
category=AirflowProviderDeprecationWarning,
)
class BaseAsyncSessionFactory(BaseSessionFactory):
"""
Base AWS Session Factory class to handle aiobotocore session creation.

It currently, handles ENV, AWS secret key and STS client method ``assume_role``
provided in Airflow connection
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

async def get_role_credentials(self) -> dict:
"""Get the role_arn, method credentials from connection and get the role credentials."""
async with self._basic_session.create_client("sts", region_name=self.region_name) as client:
response = await client.assume_role(
RoleArn=self.role_arn,
RoleSessionName=self._strip_invalid_session_name_characters(f"Airflow_{self.conn.conn_id}"),
**self.conn.assume_role_kwargs,
)
return response["Credentials"]

async def _get_refresh_credentials(self) -> dict[str, Any]:
self.log.debug("Refreshing credentials")
assume_role_method = self.conn.assume_role_method
if assume_role_method != "assume_role":
raise NotImplementedError(f"assume_role_method={assume_role_method} not expected")

credentials = await self.get_role_credentials()

expiry_time = credentials["Expiration"].isoformat()
self.log.debug("New credentials expiry_time: %s", expiry_time)
credentials = {
"access_key": credentials.get("AccessKeyId"),
"secret_key": credentials.get("SecretAccessKey"),
"token": credentials.get("SessionToken"),
"expiry_time": expiry_time,
}
return credentials

def _get_session_with_assume_role(self) -> AioSession:
assume_role_method = self.conn.assume_role_method
if assume_role_method != "assume_role":
raise NotImplementedError(f"assume_role_method={assume_role_method} not expected")

credentials = aiobotocore.credentials.AioRefreshableCredentials.create_from_metadata(
metadata=self._get_refresh_credentials(),
refresh_using=self._get_refresh_credentials,
method="sts-assume-role",
)

session = aiobotocore.session.get_session()
session._credentials = credentials
return session

@cached_property
def _basic_session(self) -> AioSession:
"""Cached property with basic aiobotocore.session.AioSession."""
session_kwargs = self.conn.session_kwargs
aws_access_key_id = session_kwargs.get("aws_access_key_id")
aws_secret_access_key = session_kwargs.get("aws_secret_access_key")
aws_session_token = session_kwargs.get("aws_session_token")
region_name = session_kwargs.get("region_name")
profile_name = session_kwargs.get("profile_name")

aio_session = get_session()
if profile_name is not None:
aio_session.set_config_variable("profile", profile_name)
if aws_access_key_id or aws_secret_access_key or aws_session_token:
aio_session.set_credentials(
access_key=aws_access_key_id,
secret_key=aws_secret_access_key,
token=aws_session_token,
)
if region_name is not None:
aio_session.set_config_variable("region", region_name)
return aio_session

def create_session(self, deferrable: bool = False) -> AioSession:
"""Create aiobotocore Session from connection and config."""
if not self._conn:
self.log.info("No connection ID provided. Fallback on boto3 credential strategy")
return get_session()
elif not self.role_arn:
return self._basic_session
return self._get_session_with_assume_role()


@deprecated(
reason=(
"`airflow.providers.amazon.aws.hook.base_aws.AwsBaseAsyncHook` "
"has been deprecated and will be removed in future"
),
category=AirflowProviderDeprecationWarning,
)
class AwsBaseAsyncHook(AwsBaseHook):
"""
Interacts with AWS using aiobotocore asynchronously.

:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is None or empty then the default botocore behaviour is used. If
running Airflow in a distributed manner and aws_conn_id is None or
empty, then default botocore configuration would be used (and must be
maintained on each worker node).
:param verify: Whether to verify SSL certificates.
:param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
:param client_type: boto3.client client_type. Eg 's3', 'emr' etc
:param resource_type: boto3.resource resource_type. Eg 'dynamodb' etc
:param config: Configuration for botocore client.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def get_async_session(self) -> AioSession:
"""Get the underlying aiobotocore.session.AioSession(...)."""
return BaseAsyncSessionFactory(
conn=self.conn_config, region_name=self.region_name, config=self.config
).create_session()

async def get_client_async(self):
"""Get the underlying aiobotocore client using aiobotocore session."""
return self.get_async_session().create_client(
self.client_type,
region_name=self.region_name,
verify=self.verify,
endpoint_url=self.conn_config.endpoint_url,
config=self.config,
)
Loading