-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
amazon
Versions of Apache Airflow Providers
8.16.0
Apache Airflow version
2.8.1
Operating System
Amazon Linux 2023.4.20240513
Deployment
Amazon (AWS) MWAA
Deployment details
Also tested on Standalone Instance which leads
What happened
When trying to read a file with Object Storage with some code looking like this:
base = ObjectStoragePath("s3://bucket/", conn_id="aws_test")
path = base / "path/to/key"
with path.open() as f:
print(f.read())I have a strange traceback error when the 'aws_test' connection with a role_name.
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", line 241, in execute
return_value = super().execute(context)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 199, in execute
return_value = self.execute_callable()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 216, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/dags/dps/example_dags/s3_object_storage.py", line 39, in read_file
with path.open() as f:
^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/upath/core.py", line 324, in open
return self._accessor.open(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/upath/core.py", line 63, in open
return self._fs.open(self._format_path(path), mode, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line 1283, in open
self.open(
File "/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line 1295, in open
f = self._open(
^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 671, in _open
return S3File(
^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 2110, in __init__
super().__init__(
File "/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line 1651, in __init__
self.size = self.details["size"]
^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/spec.py", line 1664, in details
self._details = self.fs.info(self.path)
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/asyn.py", line 118, in wrapper
return sync(self.loop, func, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/asyn.py", line 103, in sync
raise return_result
File "/usr/local/airflow/.local/lib/python3.11/site-packages/fsspec/asyn.py", line 56, in _runner
result[0] = await coro
^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 1302, in _info
out = await self._call_s3(
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 341, in _call_s3
await self.set_session()
File "/usr/local/airflow/.local/lib/python3.11/site-packages/s3fs/core.py", line 524, in set_session
s3creator = self.session.create_client(
^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'Session' object has no attribute 'create_client'
If the 'aws_test' connection is set with an Access Key ID and Secret Access Key I do not have the error.
What you think should happen instead
ObjectStoragePath should support AWS connections set with role names, especially since this is a more robust way than credentials from a security point of view.
How to reproduce
from datetime import datetime
import os
from urllib.parse import quote_plus
import json
from airflow import DAG
from airflow.decorators import task
from airflow.io.path import ObjectStoragePath
from airflow.providers.amazon.aws.hooks.sts import StsHook
DAG_ID = "minimal_objectstorage_s3"
# Setting aws connection with role
AWS_ROLE = "ROLE-ARN-XXXXXXXXXXXXXXXXX"
EXTRA = {"role_arn": AWS_ROLE}
os.environ['AIRFLOW_CONN_AWS_DEFAULT2'] = f"aws://?__extra__={quote_plus(json.dumps(EXTRA))}"
# Setting aws connection with Access Keys
# ACCESS_KEY = "AAAAAAAAA"
# SECRET_KEY = "BBBBBBBBBB"
# os.environ['AIRFLOW_CONN_AWS_CONN'] = f"aws://{ACCESS_KEY}:{SECRET_KEY}@"
base = ObjectStoragePath("s3://bucket/", conn_id="aws_default")
# dag definition:
with DAG(
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
schedule=None,
catchup=False,
) as dag:
@task
def read_file(path: ObjectStoragePath) -> str:
with path.open() as f:
return f.read()
read_file(base / "path/to/key")
@task
def aws_identity():
hook = StsHook(
aws_conn_id='aws_default'
)
print(hook.conn.get_caller_identity())
aws_identity()Anything else
Happens every time.
The error seems to lie somewhere between Airflow and Upath or s3fs packages but I'm not able to understand their Python code.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct