Skip to content
This repository was archived by the owner on Dec 6, 2024. It is now read-only.

Replace print with log at different levels, and add the option to mask AWS credentials #50

Merged
merged 1 commit into from
Jul 2, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions pandas_redshift/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os
import re
import uuid

import logging

S3_ACCEPTED_KWARGS = [
'ACL', 'Body', 'CacheControl ', 'ContentDisposition', 'ContentEncoding', 'ContentLanguage',
Expand All @@ -19,6 +19,37 @@
] # Available parameters for service: https://boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.Client.put_object



logging_config = {
'logger_level': logging.INFO,
'mask_secrets': True
}
log_format = 'Pandas Redshift | %(asctime)s | %(name)s | %(levelname)s | %(message)s'
logging.basicConfig(level=logging_config['logger_level'], format=log_format)
logger = logging.getLogger(__name__)


def set_log_level(level, mask_secrets=True):
log_level_map = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warn': logging.WARN,
'error': logging.ERROR
}
logging_config['logger_level'] = log_level_map[level]
logger = logging.getLogger(__name__)
logger.setLevel(logging_config['logger_level'])
logging_config['mask_secrets'] = mask_secrets


def mask_aws_credentials(s):
if logging_config['mask_secrets']:
import re
s = re.sub('(?<=access_key_id \')(.*)(?=\')', '*'*8, s)
s = re.sub('(?<=secret_access_key \')(.*)(?=\')', '*'*8, s)
return s


def connect_to_redshift(dbname, host, user, port=5439, **kwargs):
global connect, cursor
connect = psycopg2.connect(dbname=dbname,
Expand Down Expand Up @@ -103,15 +134,15 @@ def df_to_s3(data_frame, csv_name, index, save_local, delimiter, verbose=True, *
if save_local:
data_frame.to_csv(csv_name, index=index, sep=delimiter)
if verbose:
print('saved file {0} in {1}'.format(csv_name, os.getcwd()))
logger.info('saved file {0} in {1}'.format(csv_name, os.getcwd()))
#
csv_buffer = StringIO()
data_frame.to_csv(csv_buffer, index=index, sep=delimiter)
s3.Bucket(s3_bucket_var).put_object(
Key=s3_subdirectory_var + csv_name, Body=csv_buffer.getvalue(),
**extra_kwargs)
if verbose:
print('saved file {0} in bucket {1}'.format(
logger.info('saved file {0} in bucket {1}'.format(
csv_name, s3_subdirectory_var + csv_name))


Expand Down Expand Up @@ -181,8 +212,8 @@ def create_redshift_table(data_frame,
create_table_query += ' interleaved'
create_table_query += ' sortkey({0})'.format(sortkey)
if verbose:
print(create_table_query)
print('CREATING A TABLE IN REDSHIFT')
logger.info(create_table_query)
logger.info('CREATING A TABLE IN REDSHIFT')
cursor.execute('drop table if exists {0}'.format(redshift_table_name))
cursor.execute(create_table_query)
connect.commit()
Expand Down Expand Up @@ -224,14 +255,14 @@ def s3_to_redshift(redshift_table_name, csv_name, delimiter=',', quotechar='"',
s3_to_sql = s3_to_sql + "\n\tsession_token '{0}'".format(aws_token)
s3_to_sql = s3_to_sql + ';'
if verbose:
print(s3_to_sql)
logger.info(mask_aws_credentials(s3_to_sql))
# send the file
print('FILLING THE TABLE IN REDSHIFT')
logger.info('FILLING THE TABLE IN REDSHIFT')
try:
cursor.execute(s3_to_sql)
connect.commit()
except Exception as e:
print(e)
logger.error(e)
traceback.print_exc(file=sys.stdout)
connect.rollback()
raise
Expand Down