22
22
import bigframes .core .blocks as blocks
23
23
import bigframes .core .ordering as ordering
24
24
import bigframes .core .window_spec as windows
25
+ import bigframes .dtypes as dtypes
25
26
import bigframes .operations as ops
26
27
import bigframes .operations .aggregations as agg_ops
27
28
@@ -106,67 +107,59 @@ def indicate_duplicates(
106
107
107
108
108
109
def interpolate (block : blocks .Block , method : str = "linear" ) -> blocks .Block :
109
- if method != "linear" :
110
+ supported_methods = [
111
+ "linear" ,
112
+ "values" ,
113
+ "index" ,
114
+ "nearest" ,
115
+ "zero" ,
116
+ "slinear" ,
117
+ ]
118
+ if method not in supported_methods :
110
119
raise NotImplementedError (
111
- f"Only 'linear' interpolate method supported. { constants .FEEDBACK_LINK } "
120
+ f"Method { method } not supported, following interpolate methods supported: { ', ' . join ( supported_methods ) } . { constants .FEEDBACK_LINK } "
112
121
)
113
- backwards_window = windows .WindowSpec (following = 0 )
114
- forwards_window = windows .WindowSpec (preceding = 0 )
115
-
116
122
output_column_ids = []
117
123
118
124
original_columns = block .value_columns
119
125
original_labels = block .column_labels
120
- block , offsets = block .promote_offsets ()
126
+
127
+ if method == "linear" : # Assumes evenly spaced, ignore index
128
+ block , xvalues = block .promote_offsets ()
129
+ else :
130
+ index_columns = block .index_columns
131
+ if len (index_columns ) != 1 :
132
+ raise ValueError ("only method 'linear' supports multi-index" )
133
+ xvalues = block .index_columns [0 ]
134
+ if block .index_dtypes [0 ] not in dtypes .NUMERIC_BIGFRAMES_TYPES :
135
+ raise ValueError ("Can only interpolate on numeric index." )
136
+
121
137
for column in original_columns :
122
138
# null in same places column is null
123
139
should_interpolate = block ._column_type (column ) in [
124
140
pd .Float64Dtype (),
125
141
pd .Int64Dtype (),
126
142
]
127
143
if should_interpolate :
128
- block , notnull = block .apply_unary_op (column , ops .notnull_op )
129
- block , masked_offsets = block .apply_binary_op (
130
- offsets , notnull , ops .partial_arg3 (ops .where_op , None )
131
- )
132
-
133
- block , previous_value = block .apply_window_op (
134
- column , agg_ops .LastNonNullOp (), backwards_window
135
- )
136
- block , next_value = block .apply_window_op (
137
- column , agg_ops .FirstNonNullOp (), forwards_window
138
- )
139
- block , previous_value_offset = block .apply_window_op (
140
- masked_offsets ,
141
- agg_ops .LastNonNullOp (),
142
- backwards_window ,
143
- skip_reproject_unsafe = True ,
144
- )
145
- block , next_value_offset = block .apply_window_op (
146
- masked_offsets ,
147
- agg_ops .FirstNonNullOp (),
148
- forwards_window ,
149
- skip_reproject_unsafe = True ,
150
- )
151
-
152
- block , prediction_id = _interpolate (
144
+ interpolate_method_map = {
145
+ "linear" : "linear" ,
146
+ "values" : "linear" ,
147
+ "index" : "linear" ,
148
+ "slinear" : "linear" ,
149
+ "zero" : "ffill" ,
150
+ "nearest" : "nearest" ,
151
+ }
152
+ extrapolating_methods = ["linear" , "values" , "index" ]
153
+ interpolate_method = interpolate_method_map [method ]
154
+ do_extrapolate = method in extrapolating_methods
155
+ block , interpolated = _interpolate_column (
153
156
block ,
154
- previous_value_offset ,
155
- previous_value ,
156
- next_value_offset ,
157
- next_value ,
158
- offsets ,
157
+ column ,
158
+ xvalues ,
159
+ interpolate_method = interpolate_method ,
160
+ do_extrapolate = do_extrapolate ,
159
161
)
160
-
161
- block , interpolated_column = block .apply_binary_op (
162
- column , prediction_id , ops .fillna_op
163
- )
164
- # Pandas performs ffill-like behavior to extrapolate forwards
165
- block , interpolated_and_ffilled = block .apply_binary_op (
166
- interpolated_column , previous_value , ops .fillna_op
167
- )
168
-
169
- output_column_ids .append (interpolated_and_ffilled )
162
+ output_column_ids .append (interpolated )
170
163
else :
171
164
output_column_ids .append (column )
172
165
@@ -175,7 +168,80 @@ def interpolate(block: blocks.Block, method: str = "linear") -> blocks.Block:
175
168
return block .with_column_labels (original_labels )
176
169
177
170
178
- def _interpolate (
171
+ def _interpolate_column (
172
+ block : blocks .Block ,
173
+ column : str ,
174
+ x_values : str ,
175
+ interpolate_method : str ,
176
+ do_extrapolate : bool = True ,
177
+ ) -> typing .Tuple [blocks .Block , str ]:
178
+ if interpolate_method not in ["linear" , "nearest" , "ffill" ]:
179
+ raise ValueError ("interpolate method not supported" )
180
+ window_ordering = (ordering .OrderingColumnReference (x_values ),)
181
+ backwards_window = windows .WindowSpec (following = 0 , ordering = window_ordering )
182
+ forwards_window = windows .WindowSpec (preceding = 0 , ordering = window_ordering )
183
+
184
+ # Note, this method may
185
+ block , notnull = block .apply_unary_op (column , ops .notnull_op )
186
+ block , masked_offsets = block .apply_binary_op (
187
+ x_values , notnull , ops .partial_arg3 (ops .where_op , None )
188
+ )
189
+
190
+ block , previous_value = block .apply_window_op (
191
+ column , agg_ops .LastNonNullOp (), backwards_window
192
+ )
193
+ block , next_value = block .apply_window_op (
194
+ column , agg_ops .FirstNonNullOp (), forwards_window
195
+ )
196
+ block , previous_value_offset = block .apply_window_op (
197
+ masked_offsets ,
198
+ agg_ops .LastNonNullOp (),
199
+ backwards_window ,
200
+ skip_reproject_unsafe = True ,
201
+ )
202
+ block , next_value_offset = block .apply_window_op (
203
+ masked_offsets ,
204
+ agg_ops .FirstNonNullOp (),
205
+ forwards_window ,
206
+ skip_reproject_unsafe = True ,
207
+ )
208
+
209
+ if interpolate_method == "linear" :
210
+ block , prediction_id = _interpolate_points_linear (
211
+ block ,
212
+ previous_value_offset ,
213
+ previous_value ,
214
+ next_value_offset ,
215
+ next_value ,
216
+ x_values ,
217
+ )
218
+ elif interpolate_method == "nearest" :
219
+ block , prediction_id = _interpolate_points_nearest (
220
+ block ,
221
+ previous_value_offset ,
222
+ previous_value ,
223
+ next_value_offset ,
224
+ next_value ,
225
+ x_values ,
226
+ )
227
+ else : # interpolate_method == 'ffill':
228
+ block , prediction_id = _interpolate_points_ffill (
229
+ block ,
230
+ previous_value_offset ,
231
+ previous_value ,
232
+ next_value_offset ,
233
+ next_value ,
234
+ x_values ,
235
+ )
236
+ if do_extrapolate :
237
+ block , prediction_id = block .apply_binary_op (
238
+ prediction_id , previous_value , ops .fillna_op
239
+ )
240
+
241
+ return block .apply_binary_op (column , prediction_id , ops .fillna_op )
242
+
243
+
244
+ def _interpolate_points_linear (
179
245
block : blocks .Block ,
180
246
x0_id : str ,
181
247
y0_id : str ,
@@ -196,6 +262,53 @@ def _interpolate(
196
262
return block , prediction_id
197
263
198
264
265
+ def _interpolate_points_nearest (
266
+ block : blocks .Block ,
267
+ x0_id : str ,
268
+ y0_id : str ,
269
+ x1_id : str ,
270
+ y1_id : str ,
271
+ xpredict_id : str ,
272
+ ) -> typing .Tuple [blocks .Block , str ]:
273
+ """Interpolate by taking the y value of the nearest x value"""
274
+ block , left_diff = block .apply_binary_op (xpredict_id , x0_id , ops .sub_op )
275
+ block , right_diff = block .apply_binary_op (x1_id , xpredict_id , ops .sub_op )
276
+ # If diffs equal, choose left
277
+ block , choose_left = block .apply_binary_op (left_diff , right_diff , ops .le_op )
278
+ block , choose_left = block .apply_unary_op (
279
+ choose_left , ops .partial_right (ops .fillna_op , False )
280
+ )
281
+
282
+ block , nearest = block .apply_ternary_op (y0_id , choose_left , y1_id , ops .where_op )
283
+
284
+ block , y0_exists = block .apply_unary_op (y0_id , ops .notnull_op )
285
+ block , y1_exists = block .apply_unary_op (y1_id , ops .notnull_op )
286
+ block , is_interpolation = block .apply_binary_op (y0_exists , y1_exists , ops .and_op )
287
+
288
+ block , prediction_id = block .apply_binary_op (
289
+ nearest , is_interpolation , ops .partial_arg3 (ops .where_op , None )
290
+ )
291
+
292
+ return block , prediction_id
293
+
294
+
295
+ def _interpolate_points_ffill (
296
+ block : blocks .Block ,
297
+ x0_id : str ,
298
+ y0_id : str ,
299
+ x1_id : str ,
300
+ y1_id : str ,
301
+ xpredict_id : str ,
302
+ ) -> typing .Tuple [blocks .Block , str ]:
303
+ """Interpolates by using the preceding values"""
304
+ # check for existance of y1, otherwise we are extrapolating instead of interpolating
305
+ block , y1_exists = block .apply_unary_op (y1_id , ops .notnull_op )
306
+ block , prediction_id = block .apply_binary_op (
307
+ y0_id , y1_exists , ops .partial_arg3 (ops .where_op , None )
308
+ )
309
+ return block , prediction_id
310
+
311
+
199
312
def drop_duplicates (
200
313
block : blocks .Block , columns : typing .Sequence [str ], keep : str = "first"
201
314
) -> blocks .Block :
0 commit comments