|
8 | 8 | import os
|
9 | 9 | import re
|
10 | 10 | import uuid
|
11 |
| - |
| 11 | +import logging |
12 | 12 |
|
13 | 13 | S3_ACCEPTED_KWARGS = [
|
14 | 14 | 'ACL', 'Body', 'CacheControl ', 'ContentDisposition', 'ContentEncoding', 'ContentLanguage',
|
|
19 | 19 | ] # Available parameters for service: https://boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.Client.put_object
|
20 | 20 |
|
21 | 21 |
|
| 22 | + |
| 23 | +logging_config = { |
| 24 | + 'logger_level': logging.INFO, |
| 25 | + 'mask_secrets': True |
| 26 | +} |
| 27 | +log_format = 'Pandas Redshift | %(asctime)s | %(name)s | %(levelname)s | %(message)s' |
| 28 | +logging.basicConfig(level=logging_config['logger_level'], format=log_format) |
| 29 | +logger = logging.getLogger(__name__) |
| 30 | + |
| 31 | + |
| 32 | +def set_log_level(level, mask_secrets=True): |
| 33 | + log_level_map = { |
| 34 | + 'debug': logging.DEBUG, |
| 35 | + 'info': logging.INFO, |
| 36 | + 'warn': logging.WARN, |
| 37 | + 'error': logging.ERROR |
| 38 | + } |
| 39 | + logging_config['logger_level'] = log_level_map[level] |
| 40 | + logger = logging.getLogger(__name__) |
| 41 | + logger.setLevel(logging_config['logger_level']) |
| 42 | + logging_config['mask_secrets'] = mask_secrets |
| 43 | + |
| 44 | + |
| 45 | +def mask_aws_credentials(s): |
| 46 | + if logging_config['mask_secrets']: |
| 47 | + import re |
| 48 | + s = re.sub('(?<=access_key_id \')(.*)(?=\')', '*'*8, s) |
| 49 | + s = re.sub('(?<=secret_access_key \')(.*)(?=\')', '*'*8, s) |
| 50 | + return s |
| 51 | + |
| 52 | + |
22 | 53 | def connect_to_redshift(dbname, host, user, port=5439, **kwargs):
|
23 | 54 | global connect, cursor
|
24 | 55 | connect = psycopg2.connect(dbname=dbname,
|
@@ -103,15 +134,15 @@ def df_to_s3(data_frame, csv_name, index, save_local, delimiter, verbose=True, *
|
103 | 134 | if save_local:
|
104 | 135 | data_frame.to_csv(csv_name, index=index, sep=delimiter)
|
105 | 136 | if verbose:
|
106 |
| - print('saved file {0} in {1}'.format(csv_name, os.getcwd())) |
| 137 | + logger.info('saved file {0} in {1}'.format(csv_name, os.getcwd())) |
107 | 138 | #
|
108 | 139 | csv_buffer = StringIO()
|
109 | 140 | data_frame.to_csv(csv_buffer, index=index, sep=delimiter)
|
110 | 141 | s3.Bucket(s3_bucket_var).put_object(
|
111 | 142 | Key=s3_subdirectory_var + csv_name, Body=csv_buffer.getvalue(),
|
112 | 143 | **extra_kwargs)
|
113 | 144 | if verbose:
|
114 |
| - print('saved file {0} in bucket {1}'.format( |
| 145 | + logger.info('saved file {0} in bucket {1}'.format( |
115 | 146 | csv_name, s3_subdirectory_var + csv_name))
|
116 | 147 |
|
117 | 148 |
|
@@ -181,8 +212,8 @@ def create_redshift_table(data_frame,
|
181 | 212 | create_table_query += ' interleaved'
|
182 | 213 | create_table_query += ' sortkey({0})'.format(sortkey)
|
183 | 214 | if verbose:
|
184 |
| - print(create_table_query) |
185 |
| - print('CREATING A TABLE IN REDSHIFT') |
| 215 | + logger.info(create_table_query) |
| 216 | + logger.info('CREATING A TABLE IN REDSHIFT') |
186 | 217 | cursor.execute('drop table if exists {0}'.format(redshift_table_name))
|
187 | 218 | cursor.execute(create_table_query)
|
188 | 219 | connect.commit()
|
@@ -224,14 +255,14 @@ def s3_to_redshift(redshift_table_name, csv_name, delimiter=',', quotechar='"',
|
224 | 255 | s3_to_sql = s3_to_sql + "\n\tsession_token '{0}'".format(aws_token)
|
225 | 256 | s3_to_sql = s3_to_sql + ';'
|
226 | 257 | if verbose:
|
227 |
| - print(s3_to_sql) |
| 258 | + logger.info(mask_aws_credentials(s3_to_sql)) |
228 | 259 | # send the file
|
229 |
| - print('FILLING THE TABLE IN REDSHIFT') |
| 260 | + logger.info('FILLING THE TABLE IN REDSHIFT') |
230 | 261 | try:
|
231 | 262 | cursor.execute(s3_to_sql)
|
232 | 263 | connect.commit()
|
233 | 264 | except Exception as e:
|
234 |
| - print(e) |
| 265 | + logger.error(e) |
235 | 266 | traceback.print_exc(file=sys.stdout)
|
236 | 267 | connect.rollback()
|
237 | 268 | raise
|
|
0 commit comments