Skip to content

Commit

Permalink
Fixed some S3 bugs and added roll cost
Browse files Browse the repository at this point in the history
  • Loading branch information
saeedamen committed Jul 22, 2021
1 parent c8ef674 commit 94851e3
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 32 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ individual data providers)

# Coding log

* 22 Jul 2021
* Fixed S3 credentials management and added S3 file copy method
* Added roll costs
* 19 Jul 2021
* Added delete file method in `IOEngine` for S3
* 12 Jul 2021
Expand Down
120 changes: 96 additions & 24 deletions findatapy/market/ioengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def write_time_series_cache_to_disk(self, fname, data_frame,
filter_out_matching=None, timeout=10,
use_cache_compression=constants.use_cache_compression,
parquet_compression=constants.parquet_compression, use_pyarrow_directly=False,
md_request=None, ticker=None, ):
md_request=None, ticker=None, cloud_credentials=None):
"""Writes Pandas data frame to disk as HDF5 format or bcolz format or in Arctic
Parmeters
Expand All @@ -272,6 +272,8 @@ def write_time_series_cache_to_disk(self, fname, data_frame,

logger = LoggerManager().getLogger(__name__)

if cloud_credentials is None: cloud_credentials = constants.cloud_credentials

if md_request is not None:
fname = self.path_join(fname, md_request.create_category_key(ticker=ticker))

Expand Down Expand Up @@ -464,7 +466,7 @@ def write_time_series_cache_to_disk(self, fname, data_frame,
if fname[-5:] != '.gzip':
fname = fname + '.parquet'

self.to_parquet(data_frame, fname, cloud_credentials=constants.cloud_credentials, parquet_compression=parquet_compression,
self.to_parquet(data_frame, fname, cloud_credentials=cloud_credentials, parquet_compression=parquet_compression,
use_pyarrow_directly=use_pyarrow_directly)
# data_frame.to_parquet(fname, compression=parquet_compression)

Expand Down Expand Up @@ -906,7 +908,7 @@ def sanitize_path(self, path):

return path

def read_parquet(self, path, cloud_credentials=constants.cloud_credentials):
def read_parquet(self, path, cloud_credentials=None):
"""Reads a Pandas DataFrame from a local or s3 path
Parameters
Expand All @@ -921,6 +923,7 @@ def read_parquet(self, path, cloud_credentials=constants.cloud_credentials):
-------
DataFrame
"""
if cloud_credentials is None: cloud_credentials = constants.cloud_credentials

if "s3://" in path:
storage_options = self._convert_cred(cloud_credentials, convert_to_s3fs=True)
Expand All @@ -934,6 +937,10 @@ def _create_cloud_filesystem(self, cloud_credentials, filesystem_type):

cloud_credentials = self._convert_cred(cloud_credentials)

# os.environ["AWS_ACCESS_KEY_ID"] = cloud_credentials['aws_access_key']
# os.environ["AWS_SECRET_ACCESS_KEY"] = cloud_credentials['aws_secret_key']
# os.environ["AWS_SESSION_TOKEN"] = cloud_credentials['aws_session_token']

if 's3_pyarrow' == filesystem_type:
return pyarrow.fs.S3FileSystem(anon=cloud_credentials['aws_anon'],
access_key=cloud_credentials['aws_access_key'],
Expand All @@ -954,7 +961,7 @@ def _convert_cred(self, cloud_credentials, convert_to_s3fs=False):

boolean_keys = {'aws_anon' : False}

mappings = {'aws_anon' : 'anon',
mappings = {'aws_anon': 'anon',
'aws_access_key': 'key',
'aws_secret_key': 'secret',
'aws_session_token': 'token'
Expand All @@ -968,6 +975,7 @@ def _convert_cred(self, cloud_credentials, convert_to_s3fs=False):
cloud_credentials[m] = None

# Converts the field names eg. aws_access_key => key etc.
# Mainly for using pd.read_parquet
if convert_to_s3fs:

cloud_credentials_temp = {}
Expand All @@ -979,7 +987,7 @@ def _convert_cred(self, cloud_credentials, convert_to_s3fs=False):

return cloud_credentials

def to_parquet(self, df, path, filename=None, cloud_credentials=constants.cloud_credentials,
def to_parquet(self, df, path, filename=None, cloud_credentials=None,
parquet_compression=constants.parquet_compression, use_pyarrow_directly=False):
"""Write a DataFrame to a local or s3 path as a Parquet file
Expand All @@ -1002,6 +1010,8 @@ def to_parquet(self, df, path, filename=None, cloud_credentials=constants.cloud_
"""
logger = LoggerManager.getLogger(__name__)

if cloud_credentials is None: cloud_credentials = DataConstants().cloud_credentials

if isinstance(path, list):
pass
else:
Expand Down Expand Up @@ -1039,8 +1049,7 @@ def to_parquet(self, df, path, filename=None, cloud_credentials=constants.cloud_
except:
pass


cloud_credentials = self._convert_cred(cloud_credentials)
cloud_credentials_ = self._convert_cred(cloud_credentials)

# Tends to be slower than using pandas/pyarrow directly, but for very large files, we might have to split
# before writing to disk
Expand All @@ -1066,7 +1075,7 @@ def pyarrow_dump(df, path):
counter = 1

if 's3://' in p:
s3 = self._create_cloud_filesystem(cloud_credentials, 's3_pyarrow')
s3 = self._create_cloud_filesystem(cloud_credentials_, 's3_pyarrow')

p_in_s3 = p.replace("s3://", "")

Expand Down Expand Up @@ -1111,9 +1120,17 @@ def pyarrow_dump(df, path):
for p in path:
p = self.sanitize_path(p)

df.to_parquet(p, compression=parquet_compression,
if "s3://" in p:
storage_options = self._convert_cred(cloud_credentials, convert_to_s3fs=True)

df.to_parquet(p, compression=parquet_compression,
coerce_timestamps=constants.default_time_units,
allow_truncated_timestamps=True)
allow_truncated_timestamps=True, storage_options=storage_options)
else:

df.to_parquet(p, compression=parquet_compression,
coerce_timestamps=constants.default_time_units,
allow_truncated_timestamps=True)

except pyarrow.lib.ArrowMemoryError as e:
logger.warning("Couldn't dump using Pandas/pyarrow, will instead try chunking with pyarrow directly " + str(e))
Expand Down Expand Up @@ -1195,7 +1212,8 @@ def chunk_dataframes(self, obj, chunk_size_mb=constants.chunk_size_mb):

return obj_list

def read_csv(self, path, cloud_credentials=constants.cloud_credentials, encoding='utf-8', encoding_errors=None, errors='ignore'):
def read_csv(self, path, cloud_credentials=None, encoding='utf-8', encoding_errors=None, errors='ignore'):
if cloud_credentials is None: cloud_credentials = constants.cloud_credentials

if "s3://" in path:
s3 = self._create_cloud_filesystem(cloud_credentials, 's3_filesystem')
Expand All @@ -1214,7 +1232,8 @@ def read_csv(self, path, cloud_credentials=constants.cloud_credentials, encoding
else:
return pd.read_csv(path, encoding=encoding)

def to_csv(self, df, path, cloud_credentials=constants.cloud_credentials):
def to_csv(self, df, path, cloud_credentials=None):
if cloud_credentials is None: cloud_credentials = constants.cloud_credentials

if "s3://" in path:
s3 = self._create_cloud_filesystem(cloud_credentials, 's3_filesystem')
Expand All @@ -1227,7 +1246,9 @@ def to_csv(self, df, path, cloud_credentials=constants.cloud_credentials):
else:
df.to_csv(path)

def path_exists(self, path, cloud_credentials=constants.cloud_credentials):
def path_exists(self, path, cloud_credentials=None):
if cloud_credentials is None: cloud_credentials = constants.cloud_credentials

if "s3://" in path:
s3 = self._create_cloud_filesystem(cloud_credentials, 's3_filesystem')

Expand Down Expand Up @@ -1257,13 +1278,20 @@ def path_join(self, folder, file):

return folder

def list_files(self, path, cloud_credentials=constants.cloud_credentials):
def list_files(self, path, cloud_credentials=None):
if cloud_credentials is None: cloud_credentials = constants.cloud_credentials

if "s3://" in path:
s3 = self._create_cloud_filesystem(cloud_credentials, 's3_filesystem')

path_in_s3 = self.sanitize_path(path).replace("s3://", "")

files = ['s3://' + x for x in s3.glob(path_in_s3)]
list_files = s3.glob(path_in_s3)

if path_in_s3 in list_files:
list_files.remove(path_in_s3)

files = ['s3://' + x for x in list_files]

else:
files = glob.glob(path)
Expand All @@ -1272,20 +1300,64 @@ def list_files(self, path, cloud_credentials=constants.cloud_credentials):

return files

def delete(self, path, cloud_credentials=constants.cloud_credentials):
def delete(self, path, cloud_credentials=None):
if cloud_credentials is None: cloud_credentials = constants.cloud_credentials

if "s3://" in path:
s3 = self._create_cloud_filesystem(cloud_credentials, 's3_filesystem')
if not(isinstance(path, list)):
path = [path]

path_in_s3 = self.sanitize_path(path).replace("s3://", "")
for p in path:
if "s3://" in p:
s3 = self._create_cloud_filesystem(cloud_credentials, 's3_filesystem')

if self.path_exists(path, cloud_credentials=cloud_credentials):
s3.delete(path_in_s3)
else:
if self.path_exists(path, cloud_credentials=cloud_credentials):
os.remove(path)
path_in_s3 = self.sanitize_path(p).replace("s3://", "")

if self.path_exists(path, cloud_credentials=cloud_credentials):
s3.delete(path_in_s3)
else:
if self.path_exists(p, cloud_credentials=cloud_credentials):
os.remove(p)

def copy(self, source, destination, cloud_credentials=None, infer_dest_filename=False):
if cloud_credentials is None: cloud_credentials = constants.cloud_credentials

if destination is None:
destination = ""
infer_dest_filename = True

if not(isinstance(source, list)):
source = [source]

for so in source:
dest = destination

# Special case for wildcard *
if "*" in so:
# Infer filename if destination in s3
if "s3://" in dest:
infer_dest_filename = True

list_files = self.list_files(so, cloud_credentials=cloud_credentials)

self.copy(list_files, dest, infer_dest_filename=infer_dest_filename)
else:
if infer_dest_filename:
dest = self.path_join(destination, os.path.basename(so))

if "s3://" not in dest and "s3://" not in so:
shutil.copy(so, dest)
else:
s3 = self._create_cloud_filesystem(cloud_credentials, 's3_filesystem')

if "s3://" in dest and "s3://" in so:
s3.cp(self.sanitize_path(so).replace("s3://", ""),
self.sanitize_path(dest).replace("s3://", ""), recursive=True)

elif "s3://" in dest and "s3://" not in so:
s3.put(so, self.sanitize_path(dest).replace("s3://", ""), recursive=True)

elif "s3://" not in dest and "s3://" in so:
s3.get(self.sanitize_path(so).replace("s3://", ""), dest, recursive=True)


#######################################################################################################################
Expand Down
Loading

0 comments on commit 94851e3

Please sign in to comment.