@@ -91,28 +91,6 @@ def parse_time_pairs() -> TimePair:
91
91
return parse_time_arg ()
92
92
93
93
94
- def _handle_lag_issues_as_of (q : QueryBuilder , issues : Optional [TimeValues ] = None , lag : Optional [int ] = None , as_of : Optional [int ] = None ):
95
- if issues :
96
- q .retable (history_table )
97
- q .where_integers ("issue" , issues )
98
- elif lag is not None :
99
- q .retable (history_table )
100
- # history_table has full spectrum of lag values to search from whereas the latest_table does not
101
- q .where (lag = lag )
102
- elif as_of is not None :
103
- # fetch the most recent issue as of a certain date (not to be confused w/ plain-old "most recent issue"
104
- q .retable (history_table )
105
- sub_condition_asof = "(issue <= :as_of)"
106
- q .params ["as_of" ] = as_of
107
- sub_fields = "max(issue) max_issue, time_type, time_value, `source`, `signal`, geo_type, geo_value"
108
- sub_group = "time_type, time_value, `source`, `signal`, geo_type, geo_value"
109
- sub_condition = f"x.max_issue = { q .alias } .issue AND x.time_type = { q .alias } .time_type AND x.time_value = { q .alias } .time_value AND x.source = { q .alias } .source AND x.signal = { q .alias } .signal AND x.geo_type = { q .alias } .geo_type AND x.geo_value = { q .alias } .geo_value"
110
- q .subquery = f"JOIN (SELECT { sub_fields } FROM { q .table } WHERE { q .conditions_clause } AND { sub_condition_asof } GROUP BY { sub_group } ) x ON { sub_condition } "
111
- else :
112
- # else we are using the (standard/default) `latest_table`, to fetch the most recent issue quickly
113
- pass
114
-
115
-
116
94
@bp .route ("/" , methods = ("GET" , "POST" ))
117
95
def handle ():
118
96
source_signal_pairs = parse_source_signal_pairs ()
@@ -132,11 +110,11 @@ def handle():
132
110
fields_float = ["value" , "stderr" , "sample_size" ]
133
111
is_compatibility = is_compatibility_mode ()
134
112
if is_compatibility :
135
- q .set_order ("signal" , "time_value" , "geo_value" , "issue" )
113
+ q .set_sort_order ("signal" , "time_value" , "geo_value" , "issue" )
136
114
else :
137
115
# transfer also the new detail columns
138
116
fields_string .extend (["source" , "geo_type" , "time_type" ])
139
- q .set_order ("source" , "signal" , "time_type" , "time_value" , "geo_type" , "geo_value" , "issue" )
117
+ q .set_sort_order ("source" , "signal" , "time_type" , "time_value" , "geo_type" , "geo_value" , "issue" )
140
118
q .set_fields (fields_string , fields_int , fields_float )
141
119
142
120
# basic query info
@@ -147,7 +125,9 @@ def handle():
147
125
q .where_geo_pairs ("geo_type" , "geo_value" , geo_pairs )
148
126
q .where_time_pair ("time_type" , "time_value" , time_pair )
149
127
150
- _handle_lag_issues_as_of (q , issues , lag , as_of )
128
+ q .apply_issues_filter (history_table , issues )
129
+ q .apply_lag_filter (history_table , lag )
130
+ q .apply_as_of_filter (history_table , as_of )
151
131
152
132
def transform_row (row , proxy ):
153
133
if is_compatibility or not alias_mapper or "source" not in row :
@@ -195,15 +175,12 @@ def handle_trend():
195
175
fields_int = ["time_value" ]
196
176
fields_float = ["value" ]
197
177
q .set_fields (fields_string , fields_int , fields_float )
198
- q .set_order ("geo_type" , "geo_value" , "source" , "signal" , "time_value" )
178
+ q .set_sort_order ("geo_type" , "geo_value" , "source" , "signal" , "time_value" )
199
179
200
180
q .where_source_signal_pairs ("source" , "signal" , source_signal_pairs )
201
181
q .where_geo_pairs ("geo_type" , "geo_value" , geo_pairs )
202
182
q .where_time_pair ("time_type" , "time_value" , time_window )
203
183
204
- # fetch most recent issue fast
205
- _handle_lag_issues_as_of (q , None , None , None )
206
-
207
184
p = create_printer ()
208
185
209
186
def gen (rows ):
@@ -246,15 +223,12 @@ def handle_trendseries():
246
223
fields_int = ["time_value" ]
247
224
fields_float = ["value" ]
248
225
q .set_fields (fields_string , fields_int , fields_float )
249
- q .set_order ("geo_type" , "geo_value" , "source" , "signal" , "time_value" )
226
+ q .set_sort_order ("geo_type" , "geo_value" , "source" , "signal" , "time_value" )
250
227
251
228
q .where_source_signal_pairs ("source" , "signal" , source_signal_pairs )
252
229
q .where_geo_pairs ("geo_type" , "geo_value" , geo_pairs )
253
230
q .where_time_pair ("time_type" , "time_value" , time_window )
254
231
255
- # fetch most recent issue fast
256
- _handle_lag_issues_as_of (q , None , None , None )
257
-
258
232
p = create_printer ()
259
233
260
234
shifter = lambda x : shift_day_value (x , - basis_shift )
@@ -303,7 +277,7 @@ def handle_correlation():
303
277
fields_int = ["time_value" ]
304
278
fields_float = ["value" ]
305
279
q .set_fields (fields_string , fields_int , fields_float )
306
- q .set_order ("geo_type" , "geo_value" , "source" , "signal" , "time_value" )
280
+ q .set_sort_order ("geo_type" , "geo_value" , "source" , "signal" , "time_value" )
307
281
308
282
q .where_source_signal_pairs (
309
283
"source" ,
@@ -381,12 +355,12 @@ def handle_export():
381
355
q = QueryBuilder (latest_table , "t" )
382
356
383
357
q .set_fields (["geo_value" , "signal" , "time_value" , "issue" , "lag" , "value" , "stderr" , "sample_size" , "geo_type" , "source" ], [], [])
384
- q .set_order ("time_value" , "geo_value" )
358
+ q .set_sort_order ("time_value" , "geo_value" )
385
359
q .where_source_signal_pairs ("source" , "signal" , source_signal_pairs )
386
360
q .where_time_pair ("time_type" , "time_value" , TimePair ("day" if is_day else "week" , [(start_day , end_day )]))
387
361
q .where_geo_pairs ("geo_type" , "geo_value" , [GeoPair (geo_type , True if geo_values == "*" else geo_values )])
388
362
389
- _handle_lag_issues_as_of ( q , None , None , as_of )
363
+ q . apply_as_of_filter ( history_table , as_of )
390
364
391
365
format_date = time_value_to_iso if is_day else lambda x : time_value_to_week (x ).cdcformat ()
392
366
# tag as_of in filename, if it was specified
@@ -459,16 +433,13 @@ def handle_backfill():
459
433
fields_int = ["time_value" , "issue" ]
460
434
fields_float = ["value" , "sample_size" ]
461
435
# sort by time value and issue asc
462
- q .set_order ( time_value = True , issue = True )
436
+ q .set_sort_order ( " time_value" , " issue" )
463
437
q .set_fields (fields_string , fields_int , fields_float , ["is_latest_issue" ])
464
438
465
439
q .where_source_signal_pairs ("source" , "signal" , source_signal_pairs )
466
440
q .where_geo_pairs ("geo_type" , "geo_value" , [geo_pair ])
467
441
q .where_time_pair ("time_type" , "time_value" , time_pair )
468
442
469
- # no restriction of issues or dates since we want all issues
470
- # _handle_lag_issues_as_of(q, issues, lag, as_of)
471
-
472
443
p = create_printer ()
473
444
474
445
def find_anchor_row (rows : List [Dict [str , Any ]], issue : int ) -> Optional [Dict [str , Any ]]:
@@ -642,9 +613,7 @@ def handle_coverage():
642
613
q .where_source_signal_pairs ("source" , "signal" , source_signal_pairs )
643
614
q .where_time_pair ("time_type" , "time_value" , time_window )
644
615
q .group_by = "c.source, c.signal, c.time_value"
645
- q .set_order ("source" , "signal" , "time_value" )
646
-
647
- _handle_lag_issues_as_of (q , None , None , None )
616
+ q .set_sort_order ("source" , "signal" , "time_value" )
648
617
649
618
def transform_row (row , proxy ):
650
619
if not alias_mapper or "source" not in row :
0 commit comments