32
32
from multiprocessing .dummy import Pool as ThreadPool
33
33
from functools import partial
34
34
35
+ from botocore .client import Config
36
+ import logging
37
+ from aws_xray_sdk .core import xray_recorder
38
+ from aws_xray_sdk .core import patch_all
39
+ patch_all ()
40
+ logging .basicConfig (level = 'WARNING' )
41
+ logging .getLogger ('aws_xray_sdk' ).setLevel (logging .ERROR )
42
+ # collect all tracing samples
43
+ rules = {"version" : 1 , "default" : {"fixed_target" : 1 ,"rate" : 1 }}
44
+ xray_recorder .configure (sampling_rules = rules )
45
+
46
+ xray_recorder .begin_segment ('Map Reduce Driver' )
35
47
# create an S3 session
36
48
s3 = boto3 .resource ('s3' )
37
49
s3_client = boto3 .client ('s3' )
38
- lambda_client = boto3 .client ('lambda' )
50
+
51
+ # Setting longer timeout for reading lambda results and larger connections pool
52
+ lambda_config = Config (read_timeout = 120 , max_pool_connections = 50 )
53
+ lambda_client = boto3 .client ('lambda' ,config = lambda_config )
39
54
40
55
JOB_INFO = 'jobinfo.json'
41
56
42
57
### UTILS ####
58
+ @xray_recorder .capture ('zipLambda' )
43
59
def zipLambda (fname , zipname ):
44
60
# faster to zip with shell exec
45
61
subprocess .call (['zip' , zipname ] + glob .glob (fname ) + glob .glob (JOB_INFO ) +
46
62
glob .glob ("lambdautils.py" ))
47
63
64
+ @xray_recorder .capture ('write_to_s3' )
48
65
def write_to_s3 (bucket , key , data , metadata ):
49
66
s3 .Bucket (bucket ).put_object (Key = key , Body = data , Metadata = metadata )
50
67
68
+ @xray_recorder .capture ('write_job_config' )
51
69
def write_job_config (job_id , job_bucket , n_mappers , r_func , r_handler ):
52
70
fname = "jobinfo.json" ;
53
71
with open (fname , 'w' ) as f :
@@ -70,15 +88,14 @@ def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler):
70
88
config = json .loads (open ('driverconfig.json' , 'r' ).read ())
71
89
72
90
# 1. Get all keys to be processed
91
+ xray_recorder .begin_subsegment ('Get all keys to be processed' )
73
92
# init
74
93
bucket = config ["bucket" ]
75
94
job_bucket = config ["jobBucket" ]
76
95
region = config ["region" ]
77
96
lambda_memory = config ["lambdaMemory" ]
78
97
concurrent_lambdas = config ["concurrentLambdas" ]
79
98
80
- #all_keys = s3_client.list_objects(Bucket=bucket, Prefix=config["prefix"])["Contents"]
81
-
82
99
# Fetch all the keys that match the prefix
83
100
all_keys = []
84
101
for obj in s3 .Bucket (bucket ).objects .filter (Prefix = config ["prefix" ]).all ():
@@ -87,9 +104,13 @@ def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler):
87
104
bsize = lambdautils .compute_batch_size (all_keys , lambda_memory )
88
105
batches = lambdautils .batch_creator (all_keys , bsize )
89
106
n_mappers = len (batches )
107
+ document = xray_recorder .current_subsegment ()
108
+ document .put_metadata ("Batch size: " , bsize , "Processing initialization" )
109
+ document .put_metadata ("Mappers: " , n_mappers , "Processing initialization" )
110
+ xray_recorder .end_subsegment () #Get all keys to be processed
90
111
91
112
# 2. Create the lambda functions
92
-
113
+ xray_recorder . begin_subsegment ( 'Prepare Lambda functions' )
93
114
L_PREFIX = "BL"
94
115
95
116
# Lambda functions
@@ -103,18 +124,24 @@ def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler):
103
124
zipLambda (config ["mapper" ]["name" ], config ["mapper" ]["zip" ])
104
125
zipLambda (config ["reducer" ]["name" ], config ["reducer" ]["zip" ])
105
126
zipLambda (config ["reducerCoordinator" ]["name" ], config ["reducerCoordinator" ]["zip" ])
127
+ xray_recorder .end_subsegment () #Prepare Lambda functions
106
128
107
129
# mapper
130
+ xray_recorder .begin_subsegment ('Create mapper Lambda function' )
108
131
l_mapper = lambdautils .LambdaManager (lambda_client , s3_client , region , config ["mapper" ]["zip" ], job_id ,
109
132
mapper_lambda_name , config ["mapper" ]["handler" ])
110
133
l_mapper .update_code_or_create_on_noexist ()
134
+ xray_recorder .end_subsegment () #Create mapper Lambda function
111
135
112
136
# Reducer func
137
+ xray_recorder .begin_subsegment ('Create reducer Lambda function' )
113
138
l_reducer = lambdautils .LambdaManager (lambda_client , s3_client , region , config ["reducer" ]["zip" ], job_id ,
114
139
reducer_lambda_name , config ["reducer" ]["handler" ])
115
140
l_reducer .update_code_or_create_on_noexist ()
141
+ xray_recorder .end_subsegment () #Create reducer Lambda function
116
142
117
143
# Coordinator
144
+ xray_recorder .begin_subsegment ('Create reducer coordinator Lambda function' )
118
145
l_rc = lambdautils .LambdaManager (lambda_client , s3_client , region , config ["reducerCoordinator" ]["zip" ], job_id ,
119
146
rc_lambda_name , config ["reducerCoordinator" ]["handler" ])
120
147
l_rc .update_code_or_create_on_noexist ()
@@ -124,29 +151,36 @@ def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler):
124
151
125
152
# create event source for coordinator
126
153
l_rc .create_s3_eventsource_notification (job_bucket )
154
+ xray_recorder .end_subsegment () #Create reducer coordinator Lambda function
127
155
128
156
# Write Jobdata to S3
157
+ xray_recorder .begin_subsegment ('Write job data to S3' )
129
158
j_key = job_id + "/jobdata" ;
130
159
data = json .dumps ({
131
160
"mapCount" : n_mappers ,
132
161
"totalS3Files" : len (all_keys ),
133
162
"startTime" : time .time ()
134
163
})
164
+ xray_recorder .current_subsegment ().put_metadata ("Job data: " , data , "Write job data to S3" );
135
165
write_to_s3 (job_bucket , j_key , data , {})
166
+ xray_recorder .end_subsegment () #Write job data to S3
136
167
137
168
### Execute ###
138
169
139
170
mapper_outputs = []
140
171
141
172
#2. Invoke Mappers
173
+ xray_recorder .begin_subsegment ('Invoke mappers' )
142
174
def invoke_lambda (batches , m_id ):
175
+ xray_recorder .begin_segment ('Invoke mapper Lambda' )
143
176
'''
144
177
lambda invoke function
145
178
'''
146
179
# TODO: Increase timeout
147
180
148
181
#batch = [k['Key'] for k in batches[m_id-1]]
149
182
batch = [k .key for k in batches [m_id - 1 ]]
183
+ xray_recorder .current_segment ().put_annotation ("batch_for_mapper_" + str (m_id ), str (batch ));
150
184
#print "invoking", m_id, len(batch)
151
185
resp = lambda_client .invoke (
152
186
FunctionName = mapper_lambda_name ,
@@ -162,7 +196,7 @@ def invoke_lambda(batches, m_id):
162
196
out = eval (resp ['Payload' ].read ())
163
197
mapper_outputs .append (out )
164
198
print "mapper output" , out
165
-
199
+ xray_recorder . end_segment ()
166
200
# Exec Parallel
167
201
print "# of Mappers " , n_mappers
168
202
pool = ThreadPool (n_mappers )
@@ -175,16 +209,20 @@ def invoke_lambda(batches, m_id):
175
209
nm = min (concurrent_lambdas , n_mappers )
176
210
results = pool .map (invoke_lambda_partial , Ids [mappers_executed : mappers_executed + nm ])
177
211
mappers_executed += nm
212
+ xray_recorder .current_subsegment ().put_metadata ("Mapper lambdas executed: " , mappers_executed , "Invoke mappers" );
178
213
179
214
pool .close ()
180
215
pool .join ()
181
216
182
217
print "all the mappers finished"
218
+ xray_recorder .end_subsegment () #Invoke mappers
183
219
184
220
# Delete Mapper function
221
+ xray_recorder .begin_subsegment ('Delete mappers' )
185
222
l_mapper .delete_function ()
223
+ xray_recorder .end_subsegment () #Delete mappers
186
224
187
- ######## COST ######
225
+ xray_recorder . begin_subsegment ( 'Calculate cost' )
188
226
189
227
# Calculate costs - Approx (since we are using exec time reported by our func and not billed ms)
190
228
total_lambda_secs = 0
@@ -247,8 +285,12 @@ def invoke_lambda(batches, m_id):
247
285
print "S3 Cost" , s3_cost
248
286
print "Total Cost: " , lambda_cost + s3_cost
249
287
print "Total Lines:" , total_lines
250
-
288
+ xray_recorder . end_subsegment () #Calculate cost
251
289
252
290
# Delete Reducer function
291
+ xray_recorder .begin_subsegment ('Delete reducers' )
253
292
l_reducer .delete_function ()
254
293
l_rc .delete_function ()
294
+ xray_recorder .end_subsegment () #Delete reducers
295
+
296
+ xray_recorder .end_segment () #Map Reduce Driver
0 commit comments