Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions airflow/providers/amazon/aws/hooks/base_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import logging
import warnings
from functools import wraps
from typing import Any, Callable, Generic, TypeVar, Union
from typing import TYPE_CHECKING, Any, Callable, Generic, TypeVar, Union

import boto3
import botocore
Expand All @@ -46,14 +46,17 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.models.connection import Connection
from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.secrets_masker import mask_secret

BaseAwsConnection = TypeVar("BaseAwsConnection", bound=Union[boto3.client, boto3.resource])


if TYPE_CHECKING:
from airflow.models.connection import Connection # Avoid circular imports.


class BaseSessionFactory(LoggingMixin):
"""
Base AWS Session Factory class to handle boto3 session creation.
Expand Down
49 changes: 43 additions & 6 deletions airflow/providers/amazon/aws/utils/connection_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import json
import warnings
from copy import deepcopy
from dataclasses import MISSING, InitVar, dataclass, field, fields
Expand All @@ -39,7 +40,44 @@ class ArgNotSet: # type: ignore[no-redef]
NOTSET = ArgNotSet()

if TYPE_CHECKING:
from airflow.models.connection import Connection
from airflow.models.connection import Connection # Avoid circular imports.


@dataclass
class _ConnectionMetadata:
"""Connection metadata data-class.

This class implements main :ref:`~airflow.models.connection.Connection` attributes
and use in AwsConnectionWrapper for avoid circular imports.

Only for internal usage, this class might change or removed in the future.
"""

conn_id: str | None = None
conn_type: str | None = None
description: str | None = None
host: str | None = None
login: str | None = None
password: str | None = None
schema: str | None = None
port: int | None = None
extra: str | dict | None = None

@property
def extra_dejson(self):
if not self.extra:
return {}
extra = deepcopy(self.extra)
if isinstance(extra, str):
try:
extra = json.loads(extra)
except json.JSONDecodeError as err:
raise AirflowException(
f"'extra' expected valid JSON-Object string. Original error:\n * {err}"
) from None
if not isinstance(extra, dict):
raise TypeError(f"Expected JSON-Object or dict, got {type(extra).__name__}.")
return extra


@dataclass
Expand All @@ -61,7 +99,7 @@ class AwsConnectionWrapper(LoggingMixin):
3. The wrapper's default value
"""

conn: InitVar[Connection | AwsConnectionWrapper | None]
conn: InitVar[Connection | AwsConnectionWrapper | _ConnectionMetadata | None]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consolidate these types a bit, perhaps with a Protocol.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree with you. Main problem with this class that when I initially create this helper I added InitVar and add some limitation with define attributes for this class. I have a plan to refactor it later and also get rid of _ConnectionMetadata.

region_name: str | None = field(default=None)
# boto3 client/resource configs
botocore_config: Config | None = field(default=None)
Expand Down Expand Up @@ -242,11 +280,10 @@ def from_connection_metadata(
:param password: AWS Secret Access Key.
:param extra: Connection Extra metadata.
"""
from airflow.models.connection import Connection

return cls(
conn=Connection(conn_id=conn_id, conn_type="aws", login=login, password=password, extra=extra)
conn_meta = _ConnectionMetadata(
conn_id=conn_id, conn_type="aws", login=login, password=password, extra=extra
)
return cls(conn=conn_meta)

@property
def extra_dejson(self):
Expand Down
31 changes: 30 additions & 1 deletion tests/providers/amazon/aws/utils/test_connection_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from botocore.config import Config

from airflow.models import Connection
from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper
from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper, _ConnectionMetadata

MOCK_AWS_CONN_ID = "mock-conn-id"
MOCK_CONN_TYPE = "aws"
Expand All @@ -36,6 +36,35 @@ def mock_connection_factory(
return Connection(conn_id=conn_id, conn_type=conn_type, **kwargs)


class TestsConnectionMetadata:
@pytest.mark.parametrize("extra", [{"foo": "bar", "spam": "egg"}, '{"foo": "bar", "spam": "egg"}', None])
def test_compat_with_connection(self, extra):
"""Simple compatibility test with `airflow.models.connection.Connection`."""
conn_kwargs = {
"conn_id": MOCK_AWS_CONN_ID,
"conn_type": "aws",
"login": "mock-login",
"password": "mock-password",
"extra": extra,
# AwsBaseHook never use this attributes from airflow.models.Connection
"host": "mock-host",
"schema": "mock-schema",
"port": 42,
}
conn = Connection(**conn_kwargs)
conn_meta = _ConnectionMetadata(**conn_kwargs)

assert conn.conn_id == conn_meta.conn_id
assert conn.conn_type == conn_meta.conn_type
assert conn.login == conn_meta.login
assert conn.password == conn_meta.password
assert conn.host == conn_meta.host
assert conn.schema == conn_meta.schema
assert conn.port == conn_meta.port

assert conn.extra_dejson == conn_meta.extra_dejson


class TestAwsConnectionWrapper:
@pytest.mark.parametrize("extra", [{"foo": "bar", "spam": "egg"}, '{"foo": "bar", "spam": "egg"}', None])
def test_values_from_connection(self, extra):
Expand Down