Skip to content

Commit 2b20e09

Browse files
authored
Merge pull request #2 from harshanand-1/s3DataStorage
Implement S3 DataStorage
2 parents 1b0f99e + b8a5ee5 commit 2b20e09

File tree

8 files changed

+247
-0
lines changed

8 files changed

+247
-0
lines changed

filename

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
"LatD", "LatM", "LatS", "NS", "LonD", "LonM", "LonS", "EW", "City", "State"
2+
41, 5, 59, "N", 80, 39, 0, "W", "Youngstown", OH
3+
42, 52, 48, "N", 97, 23, 23, "W", "Yankton", SD
4+
46, 35, 59, "N", 120, 30, 36, "W", "Yakima", WA
5+
42, 16, 12, "N", 71, 48, 0, "W", "Worcester", MA
6+
43, 37, 48, "N", 89, 46, 11, "W", "Wisconsin Dells", WI
7+
36, 5, 59, "N", 80, 15, 0, "W", "Winston-Salem", NC
8+
49, 52, 48, "N", 97, 9, 0, "W", "Winnipeg", MB
9+
39, 11, 23, "N", 78, 9, 36, "W", "Winchester", VA
10+
34, 14, 24, "N", 77, 55, 11, "W", "Wilmington", NC
11+
39, 45, 0, "N", 75, 33, 0, "W", "Wilmington", DE
12+
48, 9, 0, "N", 103, 37, 12, "W", "Williston", ND
13+
41, 15, 0, "N", 77, 0, 0, "W", "Williamsport", PA
14+
37, 40, 48, "N", 82, 16, 47, "W", "Williamson", WV

src/dataplane/DataStorage/s3/__init__.py

Whitespace-only changes.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""
2+
S3Client: Client for s3
3+
S3FilePath: /tmp/source.xlsx (needs UploadMethod="File")
4+
LocalFilePath: /General/hello.xlxs (where download method is File)
5+
Download Method: File or Object
6+
FilePath: /General/hello.xlxs
7+
"""
8+
9+
def s3_download(S3Client, Bucket, S3FilePath, DownloadMethod="Object", LocalFilePath=""):
10+
11+
from datetime import datetime
12+
from io import BytesIO
13+
14+
# Start the timer
15+
start = datetime.now()
16+
17+
# Download S3 file content to file - uses multi threaded download
18+
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.download_fileobj
19+
20+
if DownloadMethod == "File":
21+
if LocalFilePath == "":
22+
duration = datetime.now() - start
23+
return {"result":"Fail", "duration": str(duration), "reason":"File method requires a local file file path"}
24+
25+
with open(LocalFilePath, 'wb') as data:
26+
S3Client.download_fileobj(Bucket=Bucket, Key=S3FilePath, Fileobj=data)
27+
28+
duration = datetime.now() - start
29+
return {"result":"OK", "duration": str(duration), "FilePath": LocalFilePath}
30+
31+
# Download S3 file content to object
32+
objectGet = S3Client.get_object(Bucket=Bucket, Key=S3FilePath, ChecksumMode='ENABLED')["Body"].read()
33+
34+
duration = datetime.now() - start
35+
return {"result":"OK", "duration": str(duration), "content": objectGet}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""
2+
S3Client: Client for s3
3+
SourceFilePath: /tmp/source.xlsx (needs UploadMethod="File")
4+
TargetFilePath: /General/hello.xlxs
5+
UploadMethod: Object or File
6+
"""
7+
8+
def s3_upload(S3Client, Bucket, TargetFilePath, SourceFilePath="/tmp/default.txt", UploadMethod="Object", UploadObject=""):
9+
10+
from datetime import datetime
11+
from io import BytesIO
12+
13+
start = datetime.now()
14+
15+
# Connect to S3 Bucket
16+
17+
# ====== Obtain the file from disk ======
18+
if UploadMethod == "File":
19+
with open(SourceFilePath, 'rb') as data:
20+
S3Client.upload_fileobj(Fileobj=data, Bucket=Bucket, Key=TargetFilePath)
21+
duration = datetime.now() - start
22+
return {"result":"OK", "duration": str(duration), "Path":TargetFilePath}
23+
24+
# ====== Upload file using boto3 upload_file ======
25+
S3Client.upload_fileobj(Fileobj=BytesIO(UploadObject), Bucket=Bucket, Key=TargetFilePath)
26+
27+
duration = datetime.now() - start
28+
29+
return {"result":"OK", "duration": str(duration)}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
2+
import os
3+
from .s3_upload import s3_upload
4+
from .s3_download import s3_download
5+
from nanoid import generate
6+
import os
7+
from dotenv import load_dotenv
8+
import boto3
9+
from botocore.client import Config
10+
11+
def test_s3():
12+
13+
# ---------- DATAPLANE RUN ------------
14+
15+
# Dataplane run id
16+
os.environ["DP_RUNID"] = generate('1234567890abcdef', 10)
17+
18+
# Sharepoint connection
19+
load_dotenv()
20+
RUN_ID = os.environ["DP_RUNID"]
21+
22+
# S3 connection
23+
S3Connect = boto3.client(
24+
's3',
25+
endpoint_url="http://minio:9000",
26+
aws_access_key_id="admin",
27+
aws_secret_access_key="hello123",
28+
config=Config(signature_version='s3v4'),
29+
region_name='us-east-1'
30+
31+
)
32+
33+
bucket = "dataplanebucket"
34+
35+
CURRENT_DIRECTORY = os.path.realpath(os.path.dirname(__file__))
36+
37+
if os.path.exists(CURRENT_DIRECTORY+"/test_cities_delete.csv"):
38+
os.remove(CURRENT_DIRECTORY+"/test_cities_delete.csv")
39+
40+
# ---------- STORE File to Sharepoint ------------
41+
# s3_upload(Bucket, AccessKey, SecretKey, TargetFilePath, SourceFilePath="/tmp/default.txt", UploadMethod="Object", UploadObject="", ProxyUse=False, ProxyUrl="", ProxyMethod="https", EndPointUrl=None)
42+
print(CURRENT_DIRECTORY)
43+
# Store the data with key hello - run id will be attached
44+
rs = s3_upload(Bucket=bucket,
45+
S3Client=S3Connect,
46+
TargetFilePath=f"/s3test/myfile {RUN_ID}.csv",
47+
SourceFilePath=CURRENT_DIRECTORY+"/test_s3_cities.csv",
48+
UploadMethod="File"
49+
)
50+
print(rs)
51+
assert rs["result"]=="OK"
52+
53+
54+
# ---------- RETRIEVE PARQUET FROM S3 ------------
55+
56+
rs = s3_download(Bucket=bucket,
57+
S3Client=S3Connect,
58+
S3FilePath=f"/s3test/myfile {RUN_ID}.csv",
59+
LocalFilePath=CURRENT_DIRECTORY+"/test_cities_delete.csv",
60+
DownloadMethod="File"
61+
)
62+
print(rs)
63+
assert rs["result"]=="OK"
64+
# Get the data
65+
# rsget = S3Get(StoreKey="s3me", S3Client=S3Connect, Bucket=bucket)
66+
# print(rsget)
67+
# df = rsget["dataframe"]
68+
# print(df.shape[0])
69+
# # Test before and after rows
70+
# assert df.shape[0] == dfrows
71+
# assert rsget["result"]=="OK"
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
"LatD", "LatM", "LatS", "NS", "LonD", "LonM", "LonS", "EW", "City", "State"
2+
41, 5, 59, "N", 80, 39, 0, "W", "Youngstown", OH
3+
42, 52, 48, "N", 97, 23, 23, "W", "Yankton", SD
4+
46, 35, 59, "N", 120, 30, 36, "W", "Yakima", WA
5+
42, 16, 12, "N", 71, 48, 0, "W", "Worcester", MA
6+
43, 37, 48, "N", 89, 46, 11, "W", "Wisconsin Dells", WI
7+
36, 5, 59, "N", 80, 15, 0, "W", "Winston-Salem", NC
8+
49, 52, 48, "N", 97, 9, 0, "W", "Winnipeg", MB
9+
39, 11, 23, "N", 78, 9, 36, "W", "Winchester", VA
10+
34, 14, 24, "N", 77, 55, 11, "W", "Wilmington", NC
11+
39, 45, 0, "N", 75, 33, 0, "W", "Wilmington", DE
12+
48, 9, 0, "N", 103, 37, 12, "W", "Williston", ND
13+
41, 15, 0, "N", 77, 0, 0, "W", "Williamsport", PA
14+
37, 40, 48, "N", 82, 16, 47, "W", "Williamson", WV
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
2+
import os
3+
from .s3_download import s3_download
4+
from .s3_upload import s3_upload
5+
from nanoid import generate
6+
import os
7+
from dotenv import load_dotenv
8+
import boto3
9+
from botocore.client import Config
10+
11+
def test_s3_object():
12+
13+
# ---------- DATAPLANE RUN ------------
14+
15+
# Dataplane run id
16+
os.environ["DP_RUNID"] = generate('1234567890abcdef', 10)
17+
18+
# Sharepoint connection
19+
load_dotenv()
20+
21+
RUN_ID = os.environ["DP_RUNID"]
22+
23+
# S3 connection
24+
S3Connect = boto3.client(
25+
's3',
26+
endpoint_url="http://minio:9000",
27+
aws_access_key_id="admin",
28+
aws_secret_access_key="hello123",
29+
config=Config(signature_version='s3v4'),
30+
region_name='us-east-1'
31+
32+
)
33+
34+
bucket = "dataplanebucket"
35+
36+
CURRENT_DIRECTORY = os.path.realpath(os.path.dirname(__file__))
37+
38+
if os.path.exists(CURRENT_DIRECTORY+"/test_cities_delete_object.csv"):
39+
os.remove(CURRENT_DIRECTORY+"/test_cities_delete_object.csv")
40+
41+
# ---------- STORE File to S3 ------------
42+
# s3_upload(Bucket, AccessKey, SecretKey, TargetFilePath, SourceFilePath="/tmp/default.txt", UploadMethod="Object", UploadObject="", ProxyUse=False, ProxyUrl="", ProxyMethod="https", EndPointUrl=None)
43+
print(CURRENT_DIRECTORY)
44+
45+
FileSize = os.path.getsize(CURRENT_DIRECTORY+"/test_s3_cities.csv")
46+
print("File size dir:", FileSize)
47+
UploadObject = open(CURRENT_DIRECTORY+"/test_s3_cities.csv", 'rb').read()
48+
# Store the data with key hello - run id will be attached
49+
rs = s3_upload(Bucket=bucket,
50+
S3Client=S3Connect,
51+
TargetFilePath=f"/s3test/object myfile {RUN_ID}.csv",
52+
UploadObject=UploadObject,
53+
UploadMethod="Object"
54+
)
55+
print(rs)
56+
assert rs["result"]=="OK"
57+
58+
59+
# ---------- RETRIEVE FILE FROM S3 ------------
60+
rs = s3_download(Bucket=bucket,
61+
S3Client=S3Connect,
62+
S3FilePath=f"/s3test/object myfile {RUN_ID}.csv",
63+
DownloadMethod="Object"
64+
)
65+
print(rs)
66+
assert rs["result"]=="OK"
67+
68+
with open(CURRENT_DIRECTORY+"/test_cities_delete_object.csv", 'wb') as f:
69+
f.write(rs["content"])
70+
# Get the data
71+
# rsget = S3Get(StoreKey="s3me", S3Client=S3Connect, Bucket=bucket)
72+
# print(rsget)
73+
# df = rsget["dataframe"]
74+
# print(df.shape[0])
75+
# # Test before and after rows
76+
# assert df.shape[0] == dfrows
77+
# assert rsget["result"]=="OK"

src/dataplane/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
from dataplane.Microsoft.Sharepoint.sharepoint_download import sharepoint_download
1818
from dataplane.Microsoft.Sharepoint.sharepoint_upload import sharepoint_upload
1919

20+
# Data storage
21+
from dataplane.DataStorage.s3.s3_download import s3_download
22+
from dataplane.DataStorage.s3.s3_upload import s3_upload
2023

2124
__all__ = [
2225

@@ -34,4 +37,8 @@
3437
"sharepoint_download",
3538
"sharepoint_upload",
3639

40+
# Data storage providers
41+
"s3_download",
42+
"s3_upload"
43+
3744
]

0 commit comments

Comments
 (0)