1
+ """S3 storage implementation."""
2
+ import io
3
+ from typing import BinaryIO , List
4
+
5
+ import boto3
6
+ from botocore .exceptions import ClientError
7
+
8
+ from application .storage .base import BaseStorage
9
+
10
+
11
+ class S3Storage (BaseStorage ):
12
+ """AWS S3 storage implementation."""
13
+
14
+ def __init__ (self , bucket_name : str , aws_access_key_id = None ,
15
+ aws_secret_access_key = None , region_name = None ):
16
+ """
17
+ Initialize S3 storage.
18
+
19
+ Args:
20
+ bucket_name: S3 bucket name
21
+ aws_access_key_id: AWS access key ID (optional if using IAM roles)
22
+ aws_secret_access_key: AWS secret access key (optional if using IAM roles)
23
+ region_name: AWS region name (optional)
24
+ """
25
+ self .bucket_name = bucket_name
26
+
27
+ # Initialize S3 client
28
+ self .s3 = boto3 .client (
29
+ 's3' ,
30
+ aws_access_key_id = aws_access_key_id ,
31
+ aws_secret_access_key = aws_secret_access_key ,
32
+ region_name = region_name
33
+ )
34
+
35
+ def save_file (self , file_data : BinaryIO , path : str ) -> str :
36
+ """Save a file to S3 storage."""
37
+ self .s3 .upload_fileobj (file_data , self .bucket_name , path )
38
+ return path
39
+
40
+ def get_file (self , path : str ) -> BinaryIO :
41
+ """Get a file from S3 storage."""
42
+ if not self .file_exists (path ):
43
+ raise FileNotFoundError (f"File not found: { path } " )
44
+
45
+ file_obj = io .BytesIO ()
46
+ self .s3 .download_fileobj (self .bucket_name , path , file_obj )
47
+ file_obj .seek (0 )
48
+ return file_obj
49
+
50
+ def delete_file (self , path : str ) -> bool :
51
+ """Delete a file from S3 storage."""
52
+ try :
53
+ self .s3 .delete_object (Bucket = self .bucket_name , Key = path )
54
+ return True
55
+ except ClientError :
56
+ return False
57
+
58
+ def file_exists (self , path : str ) -> bool :
59
+ """Check if a file exists in S3 storage."""
60
+ try :
61
+ self .s3 .head_object (Bucket = self .bucket_name , Key = path )
62
+ return True
63
+ except ClientError :
64
+ return False
65
+
66
+ def list_files (self , directory : str ) -> List [str ]:
67
+ """List all files in a directory in S3 storage."""
68
+ # Ensure directory ends with a slash if it's not empty
69
+ if directory and not directory .endswith ('/' ):
70
+ directory += '/'
71
+
72
+ result = []
73
+ paginator = self .s3 .get_paginator ('list_objects_v2' )
74
+ pages = paginator .paginate (Bucket = self .bucket_name , Prefix = directory )
75
+
76
+ for page in pages :
77
+ if 'Contents' in page :
78
+ for obj in page ['Contents' ]:
79
+ result .append (obj ['Key' ])
80
+
81
+ return result
0 commit comments