11"""lambda function on AWS Lambda."""
22import boto3
33from urllib .parse import unquote_plus
4- from flow .data_pipeline .data_pipeline import AthenaQuery , delete_obsolete_data , update_baseline
5- from flow . data_pipeline . query import tags , tables , network_using_edge , summary_tables
6- from flow .data_pipeline .query import X_FILTER , EDGE_FILTER , WARMUP_STEPS , HORIZON_STEPS
4+ from flow .data_pipeline .data_pipeline import AthenaQuery , delete_obsolete_data , update_baseline , \
5+ get_ready_queries , get_completed_queries , put_completed_queries
6+ from flow .data_pipeline .query import tables , network_filters , summary_tables , triggers
77
88s3 = boto3 .client ('s3' )
99queryEngine = AthenaQuery ()
1010
1111
1212def lambda_handler (event , context ):
1313 """Handle S3 put event on AWS Lambda."""
14+ # stores all lists of completed query for each source_id
15+ completed = {}
1416 records = []
1517 # do a pre-sweep to handle tasks other than initalizing a query
1618 for record in event ['Records' ]:
@@ -19,58 +21,55 @@ def lambda_handler(event, context):
1921 table = key .split ('/' )[0 ]
2022 if table not in tables :
2123 continue
22-
2324 # delete unwanted metadata files
24- if (key [- 9 :] == '.metadata' ):
25- s3 .delete_object (Bucket = bucket , Key = key )
26- continue
27-
25+ s3 .delete_object (Bucket = bucket , Key = (key + '.metadata' ))
2826 # load the partition for newly added table
2927 query_date = key .split ('/' )[- 3 ].split ('=' )[- 1 ]
3028 partition = key .split ('/' )[- 2 ].split ('=' )[- 1 ]
29+ source_id = "flow_{}" .format (partition .split ('_' )[1 ])
30+ if table == "fact_vehicle_trace" :
31+ query_name = "FACT_VEHICLE_TRACE"
32+ else :
33+ query_name = partition .replace (source_id , "" )[1 :]
3134 queryEngine .repair_partition (table , query_date , partition )
32-
3335 # delete obsolete data
3436 if table in summary_tables :
3537 delete_obsolete_data (s3 , key , table )
36-
3738 # add table that need to start a query to list
38- if table in tags . keys () :
39- records .append ((bucket , key , table , query_date , partition ))
39+ if query_name in triggers :
40+ records .append ((bucket , key , table , query_name , query_date , partition , source_id ))
4041
4142 # initialize the queries
42- start_filter = WARMUP_STEPS
43- stop_filter = WARMUP_STEPS + HORIZON_STEPS
44- for bucket , key , table , query_date , partition in records :
45- source_id = "flow_{}" .format (partition .split ('_' )[1 ])
43+ for bucket , key , table , query_name , query_date , partition , source_id in records :
44+ # retrieve the set of completed query for this source_id if not already available
45+ if source_id not in completed .keys ():
46+ completed [source_id ] = get_completed_queries (s3 , source_id )
47+ # if query already recorded before, skip it. This is to tolerate repetitive execution by Lambda
48+ if query_name in completed [source_id ]:
49+ continue
50+ # retrieve metadata and use it to determine the right loc_filter
4651 metadata_key = "fact_vehicle_trace/date={0}/partition_name={1}/{1}.csv" .format (query_date , source_id )
4752 response = s3 .head_object (Bucket = bucket , Key = metadata_key )
48- loc_filter = X_FILTER
4953 if 'network' in response ["Metadata" ]:
50- if response ["Metadata" ]['network' ] in network_using_edge :
51- loc_filter = EDGE_FILTER
54+ network = response ["Metadata" ]['network' ]
55+ loc_filter = network_filters [network ]['loc_filter' ]
56+ start_filter = network_filters [network ]['warmup_steps' ]
57+ stop_filter = network_filters [network ]['horizon_steps' ]
58+
59+ # update baseline if needed
5260 if table == 'fact_vehicle_trace' \
5361 and 'is_baseline' in response ['Metadata' ] and response ['Metadata' ]['is_baseline' ] == 'True' :
54- update_baseline (s3 , response ["Metadata" ]['network' ], source_id )
55-
56- query_dict = tags [table ]
57-
58- # handle different energy models
59- if table == "fact_energy_trace" :
60- energy_model_id = partition .replace (source_id , "" )[1 :]
61- query_dict = tags [energy_model_id ]
62+ update_baseline (s3 , network , source_id )
6263
64+ readied_queries = get_ready_queries (completed [source_id ], query_name )
65+ completed [source_id ].add (query_name )
6366 # initialize queries and store them at appropriate locations
64- for table_name , query_list in query_dict .items ():
65- for query_name in query_list :
66- result_location = 's3://circles.data.pipeline/{}/date={}/partition_name={}_{}' .format (table_name ,
67- query_date ,
68- source_id ,
69- query_name )
70- queryEngine .run_query (query_name ,
71- result_location ,
72- query_date ,
73- partition ,
74- loc_filter = loc_filter ,
75- start_filter = start_filter ,
76- stop_filter = stop_filter )
67+ for readied_query_name , table_name in readied_queries :
68+ result_location = 's3://circles.data.pipeline/{}/date={}/partition_name={}_{}' .format (table_name ,
69+ query_date ,
70+ source_id ,
71+ readied_query_name )
72+ queryEngine .run_query (readied_query_name , result_location , query_date , partition , loc_filter = loc_filter ,
73+ start_filter = start_filter , stop_filter = stop_filter )
74+ # stores all the updated lists of completed queries back to S3
75+ put_completed_queries (s3 , completed )
0 commit comments