Skip to content

Conversation

@moreaupascal56
Copy link

@moreaupascal56 moreaupascal56 commented Aug 19, 2022


closes: #25833

The problem description is in the issue, but the main idea is to allow the use of encryption keys in the download_file function.
Indeed giving the extra_args in load allows to fix the bad request error by calling

response = client.head_object(
    Bucket='string',
    IfMatch='string',
    IfModifiedSince=datetime(2015, 1, 1),
    IfNoneMatch='string',
    IfUnmodifiedSince=datetime(2015, 1, 1),
    Key='string',
    Range='string',
    VersionId='string',
    SSECustomerAlgorithm='string',
    SSECustomerKey='string',
    RequestPayer='requester',
    PartNumber=123,
    ExpectedBucketOwner='string',
    ChecksumMode='ENABLED'
)

with its arguments.

^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Aug 19, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Aug 19, 2022

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

)
obj = s3_resource.Object(bucket_name, key)
obj.load()
obj.load(**self.extra_args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that you could provide anything to S3.Object.load() method?

Also please have a look this. extra_args expected only use in this s3 client methods

And I do not sure extra_args values fully compatible with obj.load().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to remove this part

try:
s3_obj = self.get_key(key, bucket_name)
except ClientError as e:
if e.response.get('Error', {}).get('Code') == 404:
raise AirflowException(
f'The source file in Bucket {bucket_name} with path {key} does not exist'
)
else:
raise e

At that moment in this method first we try to check if object exists before download and raise an error if not exists.
This check could cause an issues in some cases.

Might be better do not use boto3.resource("s3").Object at all and just try to download object by boto3.client("s3") instead if any error happen we still can catch them and inform users.

Example:

        with NamedTemporaryFile(dir=local_path, prefix='airflow_tmp_', delete=False) as local_tmp_file:
            err = None
            try:
                self.get_conn().download_fileobj(
                    bucket_name,
                    key,
                    local_tmp_file,
                    ExtraArgs=self.extra_args,
                    Config=self.transfer_config,
                )
            except Exception as e:
                err = True
                if isinstance(e, ClientError):
                    ...
                    # Do some error handling here
                raise
            finally:
                if err:
                    if not local_tmp_file.closed:
                        local_tmp_file.close()
                    Path(local_tmp_file.name).unlink()

WDYT @moreaupascal56

cc: @o-nikolas @vincbeck @ferruzzi

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things are getting conflated in this thread.

1) The PR code change:

It looks like you're just unpacking the contents of extra_args into the load call. I agree with @Taragolis that I'm not sure obj.load expects all the things it could receive from that dict (or if any of them would even help with encryption). Object.load (docs) basically just calls s3client head_object (docs) which is one of those weird S3 APIs:

A HEAD request has the same options as a GET action on an object. The response is identical to the GET response except that there is no response body. Because of this, if the HEAD request generates an error, it returns a generic 404 Not Found or 403 Forbidden code. It is not possible to retrieve the exact exception beyond these error codes.

To handle encryption when using that API you have to set request headers (see again the previous doc link):

If you encrypt an object by using server-side encryption with customer-provided encryption keys (SSE-C) when you store the object in Amazon S3, then when you retrieve the metadata from the object, you must use the following headers:

x-amz-server-side-encryption-customer-algorithm
x-amz-server-side-encryption-customer-key
x-amz-server-side-encryption-customer-key-MD5

2) Andrey's suggested change:

At that moment in this method first we try to check if object exists before download and raise an error if not exists.
This check could cause an issues in some cases.

Just so that I fully understand your proposal, what are some cases you're thinking of? Also this change seems out of scope for what @moreaupascal56 is trying to achieve?

Also, also, for your suggested code snippet, you're explicitly closing and unlinking the temp file, do you not just the context manager to do so?

Copy link
Contributor

@Taragolis Taragolis Aug 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so that I fully understand your proposal, what are some cases you're thinking of?

Both general case and @moreaupascal56 case. My idea is just use more common pythonic approach: Easier to Ask for Forgiveness than Permission.

Most of the code was add about 2 years ago and most probably it just migrated from < Airflow 2 and I thought current implementation do not cover all use cases.

Initially by selected method tried to get information about object but do not pass any of the boto3.s3.transfer Download Arguments there: 'ChecksumMode', 'VersionId', 'SSECustomerAlgorithm', 'SSECustomerKey', 'SSECustomerKeyMD5', 'RequestPayer', 'ExpectedBucketOwner'.

If user want specific VersionId it actually check latest VersionId or delete marker (need to check just my thoughts).

Example

hook = S3Hook(aws_conn_id="aws_default", extra_args={"VersionId": "AAAAAAAA"})
hook.download_file(key="some/awesome/key.parquet", bucket_name="awesome-bucket")

So what might happen here?
s3_obj = self.get_key(key, bucket_name) might return object with Version ID = "AAAAAAAA" or "BBBBBBBB"

After that we tried to download file and pass ExtraArgs = {"VersionId": "AAAAAAAA"} and I thought it is possible get some funny error if s3_obj referenced to Version ID = "BBBBBBBB".

So my idea get rid of call self.get_key and just use S3.Client.download_fileobj() instead which might handle passing correct headers for customer-provided encryption and/or Version ID

you're explicitly closing and unlinking the temp file, do you not just the context manager to do so?

As I understand current code correct NamedTemporaryFile use for create unique file inside Airflow Worker, and explicit set that this file shouldn't be deleted because this method returns file path to downloaded file.

In my snippet I just add manually close file if any error happen because we still in context manager and unlink them because if some error happen this file not useful anymore and we could remove it.
Some sample of error which could keep some unwanted files, all of them not related to permission:

  • Connection error in the middle of download file
  • User tried download 20 GiB file inside Fargate temp directory and get "No space left on device"

If everything alright than context manager close the file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context manager will clean up the file even in the case of exception (see here) so there is no need to do any file handling in a finally. This is the beauty of Context Managers! 😃

@o-nikolas
Copy link
Contributor

The problem description is in the issue, but the main idea is...

Can you link the issue?

@Taragolis
Copy link
Contributor

I thought this issue #25833

@Taragolis
Copy link
Contributor

BTW, I'm the person who add ExtraArgs to S3Hook.download_file method and give to users false hope that it would work 🙄

@moreaupascal56
Copy link
Author

Hello all, thank you for the responses. Yes this is about the issue #25833 (I was sure to have mentioned it, my bad).

I have no strong opinion, but yes I am not sure about the usefulness of this get_key() and load() method in the case of s3.Hook.download_files().
But if I summarize there is 2 options:

  • skip the get_key() function
  • filter the extra_args to keep only the values compatible with obj.load()

What do you think ?

@o-nikolas
Copy link
Contributor

Hello all, thank you for the responses. Yes this is about the issue #25833 (I was sure to have mentioned it, my bad).

I have no strong opinion, but yes I am not sure about the usefulness of this get_key() and load() method in the case of s3.Hook.download_files(). But if I summarize there is 2 options:

* skip the` get_key()` function

* filter the `extra_args` to keep only the values compatible with `obj.load()`

What do you think ?

I think, in general, I agree with @Taragolis here. The S3 code is a bit old and unnecessarily complicated in many ways. It might be best to switch away from fetching the s3 object resource + calling load on it to instead using the client api directly, simply using get_object (though there may be some historical way that it is the way it is now). WDYT @moreaupascal56?

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Oct 13, 2022
@github-actions github-actions bot closed this Oct 18, 2022
@ferruzzi
Copy link
Contributor

ferruzzi commented Oct 6, 2023

This is pretty old, but if it's still an issue, how do we feel about accepting extra_args and then checking it against boto3.s3.transfer.S3Transfer.ALLOWED_DOWNLOAD_ARGS and passing only the allowed values into ExtraArgs?

Based on the S3 API docs here load() is calling head_object(), and head_object() accepts all of the parameters listed in ALLOWED_DOWNLOAD_ARGS, so that seems to me like the best route without doing a full overhaul of the S3 hook (which IMHO is long overdue also)

TL;DR: my proposal is the same as this PR, except that we ensure that only valid keys are provided to load() by checking them against the provided API list


A quick script that checks the Boto API and returns a dict {name: type} of all available parameters:

import boto3
def get_boto_params(service_name, method_name):
    client_meta = boto3.client(service_name).meta

    api_method = client_meta.method_to_api_mapping[method_name]
    service_model = client_meta.service_model
    ops_model = service_model.operation_model(api_method)
    inputs = ops_model.input_shape

    return {
        'available': dict(inputs.members),
        **inputs.metadata
    } 

get_boto_params("s3", "head_object") returns:

{'available': {'Bucket': <StringShape(BucketName)>,
 'IfMatch': <StringShape(IfMatch)>,
 'IfModifiedSince': <Shape(IfModifiedSince)>,
 'IfNoneMatch': <StringShape(IfNoneMatch)>,
 'IfUnmodifiedSince': <Shape(IfUnmodifiedSince)>,
 'Key': <StringShape(ObjectKey)>,
 'Range': <StringShape(Range)>,
 'VersionId': <StringShape(ObjectVersionId)>,
 'SSECustomerAlgorithm': <StringShape(SSECustomerAlgorithm)>,
 'SSECustomerKey': <StringShape(SSECustomerKey)>,
 'SSECustomerKeyMD5': <StringShape(SSECustomerKeyMD5)>,
 'RequestPayer': <StringShape(RequestPayer)>,
 'PartNumber': <Shape(PartNumber)>,
 'ExpectedBucketOwner': <StringShape(AccountId)>,
 'ChecksumMode': <StringShape(ChecksumMode)>},
'required': ['Bucket', 'Key']}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Airflow Amazon provider S3Hook().download_file() fail when needs encryption arguments (SSECustomerKey etc..)

4 participants