-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Adding preserve_file_name param to S3Hook.download_file method
#26886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3249b6f
f523999
841b713
4428fad
ff8b5b4
bea904e
1b824ce
0ddb262
64e4e2b
3cc4c6a
c78dbbd
e62ecea
9300ade
67daea3
f525905
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,9 +29,10 @@ | |
| from inspect import signature | ||
| from io import BytesIO | ||
| from pathlib import Path | ||
| from tempfile import NamedTemporaryFile | ||
| from tempfile import NamedTemporaryFile, gettempdir | ||
| from typing import Any, Callable, List, TypeVar, cast | ||
| from urllib.parse import urlparse | ||
| from uuid import uuid4 | ||
|
|
||
| from boto3.s3.transfer import S3Transfer, TransferConfig | ||
| from botocore.exceptions import ClientError | ||
|
|
@@ -879,17 +880,38 @@ def delete_objects(self, bucket: str, keys: str | list) -> None: | |
|
|
||
| @provide_bucket_name | ||
| @unify_bucket_name_and_key | ||
| def download_file(self, key: str, bucket_name: str | None = None, local_path: str | None = None) -> str: | ||
| def download_file( | ||
| self, | ||
| key: str, | ||
| bucket_name: str | None = None, | ||
| local_path: str | None = None, | ||
| preserve_file_name: bool = False, | ||
| use_autogenerated_subdir: bool = True, | ||
| ) -> str: | ||
| """ | ||
| Downloads a file from the S3 location to the local file system. | ||
|
|
||
| :param key: The key path in S3. | ||
| :param bucket_name: The specific bucket to use. | ||
| :param local_path: The local path to the downloaded file. If no path is provided it will use the | ||
| system's temporary directory. | ||
| :param preserve_file_name: If you want the downloaded file name to be the same name as it is in S3, | ||
| set this parameter to True. When set to False, a random filename will be generated. | ||
| Default: False. | ||
| :param use_autogenerated_subdir: Pairs with 'preserve_file_name = True' to download the file into a | ||
| random generated folder inside the 'local_path', useful to avoid collisions between various tasks | ||
| that might download the same file name. Set it to 'False' if you don't want it, and you want a | ||
| predictable path. | ||
| Default: True. | ||
| :return: the file name. | ||
| :rtype: str | ||
| """ | ||
| self.log.info( | ||
| "This function shadows the 'download_file' method of S3 API, but it is not the same. If you " | ||
| "want to use the original method from S3 API, please call " | ||
| "'S3Hook.get_conn().download_file()'" | ||
| ) | ||
|
|
||
| self.log.info("Downloading source S3 file from Bucket %s with path %s", bucket_name, key) | ||
|
|
||
| try: | ||
|
|
@@ -902,14 +924,30 @@ def download_file(self, key: str, bucket_name: str | None = None, local_path: st | |
| else: | ||
| raise e | ||
|
|
||
| with NamedTemporaryFile(dir=local_path, prefix="airflow_tmp_", delete=False) as local_tmp_file: | ||
| if preserve_file_name: | ||
| local_dir = local_path if local_path else gettempdir() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I’m a bit worried that using the temp dir directly with a predictive file name may cause a vulnarability. I don’t have concrete examples, but the combination is sort of a red flag.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WDYM? I do not quite understand why it may cause a vulnerability. Do you think it's better to stay with the older implementation of renaming the file after it's already been created? I think that this way is a bit cleaner, but I'm also ok with also "reverting" to the old flow..
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, in all flows, the user can still have a file kept in S3 with a name that can cause vulnerability and later saved in the temp directory that will be generated using the same function (if we don't provide a This is not different, it means that we can't implement this feature at all?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To follow up on what @uranusjr 's thought: Say we are using LocalExecutor or CeleryExecutor, so two users' jobs can be executed on the same host. Here you are having But just a vague thinking and very likely I missed something. Please feel free to point out.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The risk can be greatly reduced if the file is put in a subdirectory instead of directly inside the temp directory root (so the full path the file is downloaded to remains unpredictable), but that may lead to additional cleanup issues since directories are more finicky than files. I’d be happy tif it works though.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do also like option 1, it solves many of the complexities you pointed out (or at least bubbles them up to the user) and also allows the user to create a path that is predictable, so this is probably my preference. But they should be able to provide a full sub path within tmp so that they can organize files with similar names to their preference. Although, option 2 would be perfectly serviceable as well.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eventually, I did 2 things:
@o-nikolas @uranusjr @XD-DENG Will appreciate your review of the latest additions to this flow 🙏
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @alexkruc, thanks for sticking with this! The method stub is a little complicated now, but I think it's a decent middle ground given all the constraints that came up in the discussions here 👍
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @o-nikolas Thanks!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| subdir = f"airflow_tmp_dir_{uuid4().hex[0:8]}" if use_autogenerated_subdir else "" | ||
| filename_in_s3 = s3_obj.key.rsplit("/", 1)[-1] | ||
| file_path = Path(local_dir, subdir, filename_in_s3) | ||
|
|
||
| if file_path.is_file(): | ||
| self.log.error("file '%s' already exists. Failing the task and not overwriting it", file_path) | ||
| raise FileExistsError | ||
|
|
||
| file_path.parent.mkdir(exist_ok=True, parents=True) | ||
|
|
||
| file = open(file_path, "wb") | ||
| else: | ||
| file = NamedTemporaryFile(dir=local_path, prefix="airflow_tmp_", delete=False) # type: ignore | ||
|
|
||
| with file: | ||
| s3_obj.download_fileobj( | ||
| local_tmp_file, | ||
| file, | ||
| ExtraArgs=self.extra_args, | ||
| Config=self.transfer_config, | ||
| ) | ||
|
|
||
| return local_tmp_file.name | ||
| return file.name | ||
|
|
||
| def generate_presigned_url( | ||
| self, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Taragolis I've added a log message here to show that this function shadows boto's method, hope that's fine :)