Skip to content

S3 ObjectStorage can't use AWS connection with role #39970

@arollet22

Description

@arollet22

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

8.16.0

Apache Airflow version

2.8.1

Operating System

Amazon Linux 2023.4.20240513

Deployment

Amazon (AWS) MWAA

Deployment details

Also tested on Standalone Instance which leads

What happened

When trying to read a file with Object Storage with some code looking like this:

base = ObjectStoragePath("s3://bucket/", conn_id="aws_test")
path = base / "path/to/key"
with path.open() as f:
    print(f.read())

I have a strange traceback error when the 'aws_test' connection with a role_name.

Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", line 241, in execute
    return_value = super().execute(context)
                   ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 199, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 216, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/dags/dps/example_dags/s3_object_storage.py", line 39, in read_file
    with path.open() as f:
         ^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/upath/core.py", line 324, in open
    return self._accessor.open(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/upath/core.py", line 63, in open
    return self._fs.open(self._format_path(path), mode, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line 1283, in open
    self.open(
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line 1295, in open
    f = self._open(
        ^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 671, in _open
    return S3File(
           ^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 2110, in __init__
    super().__init__(
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line 1651, in __init__
    self.size = self.details["size"]
                ^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line 1664, in details
    self._details = self.fs.info(self.path)
                    ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/asyn.py", line 118, in wrapper
    return sync(self.loop, func, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/asyn.py", line 103, in sync
    raise return_result
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/asyn.py", line 56, in _runner
    result[0] = await coro
                ^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 1302, in _info
    out = await self._call_s3(
          ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 341, in _call_s3
    await self.set_session()
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 524, in set_session
    s3creator = self.session.create_client(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'Session' object has no attribute 'create_client'

If the 'aws_test' connection is set with an Access Key ID and Secret Access Key I do not have the error.

What you think should happen instead

ObjectStoragePath should support AWS connections set with role names, especially since this is a more robust way than credentials from a security point of view.

How to reproduce

from datetime import datetime
import os
from urllib.parse import quote_plus

import json

from airflow import DAG
from airflow.decorators import task
from airflow.io.path import ObjectStoragePath

from airflow.providers.amazon.aws.hooks.sts import StsHook

DAG_ID = "minimal_objectstorage_s3"

# Setting aws connection with role
AWS_ROLE = "ROLE-ARN-XXXXXXXXXXXXXXXXX"
EXTRA = {"role_arn": AWS_ROLE}
os.environ['AIRFLOW_CONN_AWS_DEFAULT2'] = f"aws://?__extra__={quote_plus(json.dumps(EXTRA))}"

# Setting aws connection with Access Keys
# ACCESS_KEY = "AAAAAAAAA"
# SECRET_KEY = "BBBBBBBBBB"
# os.environ['AIRFLOW_CONN_AWS_CONN'] = f"aws://{ACCESS_KEY}:{SECRET_KEY}@"


base = ObjectStoragePath("s3://bucket/", conn_id="aws_default")

# dag definition:
with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2021, 1, 1),
    schedule=None,
    catchup=False,
) as dag:

    @task
    def read_file(path: ObjectStoragePath) -> str:

        with path.open() as f:
            return f.read()

    read_file(base / "path/to/key")

    @task
    def aws_identity():
        hook = StsHook(
            aws_conn_id='aws_default'
        )

        print(hook.conn.get_caller_identity())

    aws_identity()

Anything else

Happens every time.
The error seems to lie somewhere between Airflow and Upath or s3fs packages but I'm not able to understand their Python code.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions