@@ -24,11 +24,14 @@ use arrow::{
2424} ;
2525use arrow_schema:: Field ;
2626use datafusion:: error:: Result ;
27+ use datafusion:: functions_aggregate:: average:: avg_udaf;
2728use datafusion:: prelude:: * ;
2829use datafusion_common:: ScalarValue ;
29- use datafusion_expr:: function:: WindowUDFFieldArgs ;
30+ use datafusion_expr:: expr:: WindowFunction ;
31+ use datafusion_expr:: function:: { WindowFunctionSimplification , WindowUDFFieldArgs } ;
32+ use datafusion_expr:: simplify:: SimplifyInfo ;
3033use datafusion_expr:: {
31- PartitionEvaluator , Signature , WindowFrame , WindowUDF , WindowUDFImpl ,
34+ Expr , PartitionEvaluator , Signature , WindowFrame , WindowUDF , WindowUDFImpl ,
3235} ;
3336use datafusion_functions_window_common:: partition:: PartitionEvaluatorArgs ;
3437
@@ -142,6 +145,67 @@ impl PartitionEvaluator for MyPartitionEvaluator {
142145 }
143146}
144147
148+ /// This UDWF will show how to use the WindowUDFImpl::simplify() API
149+ #[ derive( Debug , Clone ) ]
150+ struct SimplifySmoothItUdf {
151+ signature : Signature ,
152+ }
153+
154+ impl SimplifySmoothItUdf {
155+ fn new ( ) -> Self {
156+ Self {
157+ signature : Signature :: exact (
158+ // this function will always take one arguments of type f64
159+ vec ! [ DataType :: Float64 ] ,
160+ // this function is deterministic and will always return the same
161+ // result for the same input
162+ Volatility :: Immutable ,
163+ ) ,
164+ }
165+ }
166+ }
167+ impl WindowUDFImpl for SimplifySmoothItUdf {
168+ fn as_any ( & self ) -> & dyn Any {
169+ self
170+ }
171+
172+ fn name ( & self ) -> & str {
173+ "simplify_smooth_it"
174+ }
175+
176+ fn signature ( & self ) -> & Signature {
177+ & self . signature
178+ }
179+
180+ fn partition_evaluator (
181+ & self ,
182+ _partition_evaluator_args : PartitionEvaluatorArgs ,
183+ ) -> Result < Box < dyn PartitionEvaluator > > {
184+ todo ! ( )
185+ }
186+
187+ /// this function will simplify `SimplifySmoothItUdf` to `AggregateUDF` for `Avg`
188+ /// default implementation will not be called (left as `todo!()`)
189+ fn simplify ( & self ) -> Option < WindowFunctionSimplification > {
190+ let simplify = |window_function : WindowFunction , _: & dyn SimplifyInfo | {
191+ Ok ( Expr :: WindowFunction ( WindowFunction {
192+ fun : datafusion_expr:: WindowFunctionDefinition :: AggregateUDF ( avg_udaf ( ) ) ,
193+ args : window_function. args ,
194+ partition_by : window_function. partition_by ,
195+ order_by : window_function. order_by ,
196+ window_frame : window_function. window_frame ,
197+ null_treatment : window_function. null_treatment ,
198+ } ) )
199+ } ;
200+
201+ Some ( Box :: new ( simplify) )
202+ }
203+
204+ fn field ( & self , field_args : WindowUDFFieldArgs ) -> Result < Field > {
205+ Ok ( Field :: new ( field_args. name ( ) , DataType :: Float64 , true ) )
206+ }
207+ }
208+
145209// create local execution context with `cars.csv` registered as a table named `cars`
146210async fn create_context ( ) -> Result < SessionContext > {
147211 // declare a new context. In spark API, this corresponds to a new spark SQL session
@@ -162,12 +226,15 @@ async fn main() -> Result<()> {
162226 let smooth_it = WindowUDF :: from ( SmoothItUdf :: new ( ) ) ;
163227 ctx. register_udwf ( smooth_it. clone ( ) ) ;
164228
165- // Use SQL to run the new window function
229+ let simplify_smooth_it = WindowUDF :: from ( SimplifySmoothItUdf :: new ( ) ) ;
230+ ctx. register_udwf ( simplify_smooth_it. clone ( ) ) ;
231+
232+ // Use SQL to retrieve entire table
166233 let df = ctx. sql ( "SELECT * from cars" ) . await ?;
167234 // print the results
168235 df. show ( ) . await ?;
169236
170- // Use SQL to run the new window function :
237+ // Use SQL to run smooth_it :
171238 //
172239 // `PARTITION BY car`:each distinct value of car (red, and green)
173240 // should be treated as a separate partition (and will result in
@@ -201,7 +268,7 @@ async fn main() -> Result<()> {
201268 // print the results
202269 df. show ( ) . await ?;
203270
204- // this time, call the new widow function with an explicit
271+ // this time, call the function with an explicit
205272 // window so evaluate will be invoked with each window.
206273 //
207274 // `ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING`: each invocation
@@ -232,5 +299,22 @@ async fn main() -> Result<()> {
232299 // print the results
233300 df. show ( ) . await ?;
234301
302+ // Use SQL to run simplify_smooth_it
303+ let df = ctx
304+ . sql (
305+ "SELECT \
306+ car, \
307+ speed, \
308+ simplify_smooth_it(speed) OVER (PARTITION BY car ORDER BY time) AS smooth_speed,\
309+ time \
310+ from cars \
311+ ORDER BY \
312+ car",
313+ )
314+ . await ?;
315+
316+ // print the results
317+ df. show ( ) . await ?;
318+
235319 Ok ( ( ) )
236320}
0 commit comments