1
1
#!/usr/bin/env python3
2
- import csv ,datetime ,decimal ,json ,gzip ,os ,pyodbc ,shutil ,string ,tempfile ,uuid
2
+ import csv , datetime , decimal , json , gzip , os , pyodbc , shutil , string , tempfile , uuid
3
3
import snowflake .connector as sf
4
4
import multiprocessing
5
+
5
6
#####################################################################
6
7
## Pre-job
7
8
## 1) Validate connections
10
11
## b) Replace environment variables ${something} style
11
12
## c) Convert to dict
12
13
#####################################################################
13
- #1
14
+ # 1
14
15
# Test environment variables
15
16
try :
16
17
PYODBC_DRIVER = os .environ ['PYODBC_DRIVER' ]
49
50
# 2
50
51
# load job file
51
52
with open ('job_list.json' ) as table_list_file :
52
- table_list_raw \
53
- = table_list_file .read ()
54
- table_list_template \
55
- = string .Template (table_list_raw )
56
- job_list_json \
57
- = table_list_template .substitute ({x : os .environ [x ] for x in os .environ })
58
- job_list \
59
- = json .loads (job_list_json )[0 ]
53
+ table_list_raw = table_list_file .read ()
54
+ table_list_template = string .Template (table_list_raw )
55
+ job_list_json = table_list_template .substitute ({x : os .environ [x ] for x in os .environ })
56
+ job_list = json .loads (job_list_json )[0 ]
60
57
61
58
62
59
#####################################################################
71
68
## ...repeat for each job
72
69
#####################################################################
73
70
def write_data (chunk ):
74
- path = os .path .join (chunk [0 ],'' )
71
+ path = os .path .join (chunk [0 ], '' )
75
72
job_name = chunk [1 ]
76
73
header = chunk [2 ]
77
74
rows = chunk [3 ]
78
75
num = multiprocessing .current_process ().name [16 :]
79
76
filename = f'{ path } { job_name } .{ num } .csv.gz'
80
77
if os .path .exists (filename ) is False :
81
78
with gzip .open (filename , 'at' , encoding = 'utf-8' , newline = '' ) as f :
82
- csv_writer = csv .writer (f ,
83
- quoting = csv .QUOTE_NONNUMERIC )
79
+ csv_writer = csv .writer (f , quoting = csv .QUOTE_NONNUMERIC )
84
80
csv_writer .writerows (header )
85
81
with gzip .open (filename , 'at' , encoding = 'utf-8' , newline = '' ) as f :
86
- csv_writer = csv .writer (f ,quoting = csv .QUOTE_NONNUMERIC )
82
+ csv_writer = csv .writer (f , quoting = csv .QUOTE_NONNUMERIC )
87
83
csv_writer .writerows (rows )
88
84
89
- #1
85
+
86
+ # 1
90
87
if __name__ == '__main__' :
91
88
src_def = {}
92
89
for job in job_list .keys ():
93
90
src_qry = job_list [job ]['extract' ]['query' ]
94
- probe_qry = f"""select * from ({ src_qry } ) subquery WHERE 0=1"""
91
+ probe_qry = f"""select * from ({ src_qry } ) subquery WHERE 0=1"""
95
92
odbc_cursor = pyodbc .connect (odbc_connection_string ).cursor ()
96
93
odbc_cursor .execute (probe_qry )
97
94
odbc_cursor .fetchone ()
98
95
src_def [job ] = odbc_cursor .description
99
96
odbc_cursor .close ()
100
- ## for item in src_def:
101
- ## print(item, src_def[item], '\n')
102
97
103
- #2
98
+
99
+ ## for item in src_def:
100
+ ## print(item, src_def[item], '\n')
101
+
102
+ # 2
104
103
def get_rows (cursor ):
105
104
while True :
106
105
row = cursor .fetchmany (500 )
107
106
if len (row ) != 0 :
108
107
yield row
109
108
else :
110
109
break
110
+
111
+
111
112
tgt_sql = {}
112
113
with tempfile .TemporaryDirectory () as tempdir :
113
114
for job_name in job_list .keys ():
114
115
print (f'Extracting { job_name } ' )
115
116
src_qry = job_list [job_name ]['extract' ]['query' ]
116
- header = []
117
- header .append ( \
118
- tuple (x [0 ].upper () for x in src_def [job_name ]))
119
- odbc_cursor = \
120
- pyodbc .connect (odbc_connection_string ).cursor ()
117
+ header = [tuple (x [0 ].upper () for x in src_def [job_name ])]
118
+ odbc_cursor = pyodbc .connect (odbc_connection_string ).cursor ()
121
119
odbc_cursor .execute (src_qry )
122
120
123
121
with multiprocessing .Pool () as p :
124
122
while True :
125
123
try :
126
124
rows = next (get_rows (odbc_cursor ))
127
- p .map (write_data ,(
128
- (tempdir ,job_name ,header ,rows ),))
125
+ p .map (write_data , ((tempdir , job_name , header , rows ),))
129
126
except StopIteration :
130
127
break
131
- ## pauses the tempdir clean up
132
- ## can be used to inspect files
133
- ##os.system("pause")
128
+ ## pauses the tempdir clean up
129
+ ## can be used to inspect files
130
+ ##os.system("pause")
134
131
135
- #####################################################################
136
- ## Transform
137
- ## 1) Covert source to target definition
138
- ## 2) Generate sql for later use
139
- #####################################################################
132
+ #####################################################################
133
+ ## Transform
134
+ ## 1) Covert source to target definition
135
+ ## 2) Generate sql for later use
136
+ #####################################################################
140
137
with open ('type_conversion.json' ) as tc :
141
138
tc2 = tc .read ()
142
139
tc3 = json .loads (tc2 )[0 ]
143
- tc4 = dict ([(eval (f'type({ k } )' ),tc3 [k ]) for k in tc3 .keys ()])
144
- tgt_def = {}
145
- tgt_def [job_name ]= [(col [0 ],tc4 [col [1 ]]) for col in src_def [job_name ]]
146
- col_name = ',' .join ( \
147
- ['"' + i [0 ].upper ()+ '"' + ' ' + i [1 ]+ '\n ' for i in tgt_def [job_name ]])
148
- col_num = ',' .join (\
149
- ['t.$' + str (n )+ '\n ' for n in range (1 ,len (tgt_def [job_name ])+ 1 )])
140
+ tc4 = dict ([(eval (f'type({ k } )' ), tc3 [k ]) for k in tc3 .keys ()])
141
+ tgt_def = {job_name : [(col [0 ], tc4 [col [1 ]]) for col in src_def [job_name ]]}
142
+ col_name = ',' .join (['"' + i [0 ].upper () + '"' + ' ' + i [1 ] + '\n ' for i in tgt_def [job_name ]])
143
+ col_num = ',' .join (['t.$' + str (n ) + '\n ' for n in range (1 , len (tgt_def [job_name ]) + 1 )])
150
144
database = job_list [job_name ]['load' ]['database' ].upper ()
151
145
schema = job_list [job_name ]['load' ]['schema' ].upper ()
152
146
table = job_list [job_name ]['load' ]['table' ].upper ()
153
- path = os .path .join (tempdir ,'' ).replace ('\\ ' ,'/' )
147
+ path = os .path .join (tempdir , '' ).replace ('\\ ' , '/' )
154
148
stage = job_name .upper ()
155
149
tgt_sql [job_name ] = [
156
150
f'CREATE DATABASE IF NOT EXISTS "{ database } ";' ,
@@ -175,34 +169,34 @@ def get_rows(cursor):
175
169
f'USE DATABASE "{ database } ";' ,
176
170
f'USE SCHEMA "{ schema } ";' ,
177
171
f'USE WAREHOUSE LOAD_WH;' ,
178
- f'CREATE OR REPLACE TABLE "{ schema } "."{ table } "\n (' +
179
- col_name +
180
- ') AS SELECT\n ' +
181
- col_num +
182
- f'FROM @"{ stage } " t;' ,
172
+ f'CREATE OR REPLACE TABLE "{ schema } "."{ table } "\n (' +
173
+ col_name +
174
+ ') AS SELECT\n ' +
175
+ col_num +
176
+ f'FROM @"{ stage } " t;' ,
183
177
f'DROP STAGE "{ stage } ";'
184
178
]
185
- #print(tgt_sql)
179
+ # print(tgt_sql)
186
180
187
- #####################################################################
188
- ## Load
189
- ## 1) Upload files
190
- ## 2) Create tables on load
191
- #####################################################################
181
+ #####################################################################
182
+ ## Load
183
+ ## 1) Upload files
184
+ ## 2) Create tables on load
185
+ #####################################################################
192
186
sf_cursor = sf .connect (** sf_dict ).cursor ()
193
187
for job_name in job_list .keys ():
194
188
print (f'Uploading { job_name } ' )
195
189
for stmt in tgt_sql [job_name ][:7 ]:
196
190
sf_cursor .execute (stmt )
197
191
sf_cursor .close ()
198
-
192
+
199
193
sf_cursor = sf .connect (** sf_dict ).cursor ()
200
194
for job_name in job_list .keys ():
201
195
print (f'Loading { job_name } ' )
202
196
for stmt in tgt_sql [job_name ][7 :]:
203
197
sf_cursor .execute (stmt )
204
198
for job_name in job_list .keys ():
205
- print (f'Completed { job_name } ' )
199
+ print (f'Completed { job_name } ' )
206
200
sf_cursor .close ()
207
-
201
+
208
202
### END ###
0 commit comments