1
1
"""S3 storage implementation."""
2
2
import io
3
- from typing import BinaryIO , List
3
+ from typing import BinaryIO , List , Callable
4
4
5
5
import boto3
6
6
from botocore .exceptions import ClientError
@@ -24,7 +24,6 @@ def __init__(self, bucket_name: str, aws_access_key_id=None,
24
24
"""
25
25
self .bucket_name = bucket_name
26
26
27
- # Initialize S3 client
28
27
self .s3 = boto3 .client (
29
28
's3' ,
30
29
aws_access_key_id = aws_access_key_id ,
@@ -78,4 +77,38 @@ def list_files(self, directory: str) -> List[str]:
78
77
for obj in page ['Contents' ]:
79
78
result .append (obj ['Key' ])
80
79
81
- return result
80
+ return result
81
+
82
+ def process_file (self , path : str , processor_func : Callable , ** kwargs ):
83
+ """
84
+ Process a file using the provided processor function.
85
+
86
+ For S3 storage, we need to download the file to a temporary location first.
87
+
88
+ Args:
89
+ path: Path to the file
90
+ processor_func: Function that processes the file
91
+ **kwargs: Additional arguments to pass to the processor function
92
+
93
+ Returns:
94
+ The result of the processor function
95
+ """
96
+ import tempfile
97
+ import os
98
+
99
+ if not self .file_exists (path ):
100
+ raise FileNotFoundError (f"File not found: { path } " )
101
+
102
+ with tempfile .NamedTemporaryFile (delete = False ) as temp_file :
103
+ self .s3 .download_fileobj (self .bucket_name , path , temp_file )
104
+ temp_path = temp_file .name
105
+
106
+ try :
107
+ result = processor_func (file_path = temp_path , ** kwargs )
108
+ return result
109
+ finally :
110
+ try :
111
+ os .unlink (temp_path )
112
+ except Exception as e :
113
+ import logging
114
+ logging .warning (f"Failed to delete temporary file: { e } " )
0 commit comments