Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 89 additions & 5 deletions datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ use arrow::{
};
use arrow_schema::Field;
use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::prelude::*;
use datafusion_common::ScalarValue;
use datafusion_expr::function::WindowUDFFieldArgs;
use datafusion_expr::expr::WindowFunction;
use datafusion_expr::function::{WindowFunctionSimplification, WindowUDFFieldArgs};
use datafusion_expr::simplify::SimplifyInfo;
use datafusion_expr::{
PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
Expr, PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
};
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;

Expand Down Expand Up @@ -142,6 +145,67 @@ impl PartitionEvaluator for MyPartitionEvaluator {
}
}

/// This UDWF will show how to use the WindowUDFImpl::simplify() API
#[derive(Debug, Clone)]
struct SimplifySmoothItUdf {
signature: Signature,
}

impl SimplifySmoothItUdf {
fn new() -> Self {
Self {
signature: Signature::exact(
// this function will always take one arguments of type f64
vec![DataType::Float64],
// this function is deterministic and will always return the same
// result for the same input
Volatility::Immutable,
),
}
}
}
impl WindowUDFImpl for SimplifySmoothItUdf {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"simplify_smooth_it"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn partition_evaluator(
&self,
_partition_evaluator_args: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>> {
todo!()
}

/// this function will simplify `SimplifySmoothItUdf` to `AggregateUDF` for `Avg`
/// default implementation will not be called (left as `todo!()`)
fn simplify(&self) -> Option<WindowFunctionSimplification> {
let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| {
Ok(Expr::WindowFunction(WindowFunction {
fun: datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()),
args: window_function.args,
partition_by: window_function.partition_by,
order_by: window_function.order_by,
window_frame: window_function.window_frame,
null_treatment: window_function.null_treatment,
}))
};

Some(Box::new(simplify))
}

fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
Ok(Field::new(field_args.name(), DataType::Float64, true))
}
}

// create local execution context with `cars.csv` registered as a table named `cars`
async fn create_context() -> Result<SessionContext> {
// declare a new context. In spark API, this corresponds to a new spark SQL session
Expand All @@ -162,12 +226,15 @@ async fn main() -> Result<()> {
let smooth_it = WindowUDF::from(SmoothItUdf::new());
ctx.register_udwf(smooth_it.clone());

// Use SQL to run the new window function
let simplify_smooth_it = WindowUDF::from(SimplifySmoothItUdf::new());
ctx.register_udwf(simplify_smooth_it.clone());

// Use SQL to retrieve entire table
let df = ctx.sql("SELECT * from cars").await?;
// print the results
df.show().await?;

// Use SQL to run the new window function:
// Use SQL to run smooth_it:
//
// `PARTITION BY car`:each distinct value of car (red, and green)
// should be treated as a separate partition (and will result in
Expand Down Expand Up @@ -201,7 +268,7 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// this time, call the new widow function with an explicit
// this time, call the function with an explicit
// window so evaluate will be invoked with each window.
//
// `ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING`: each invocation
Expand Down Expand Up @@ -232,5 +299,22 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// Use SQL to run simplify_smooth_it
let df = ctx
.sql(
"SELECT \
car, \
speed, \
simplify_smooth_it(speed) OVER (PARTITION BY car ORDER BY time) AS smooth_speed,\
time \
from cars \
ORDER BY \
car",
)
.await?;

// print the results
df.show().await?;

Ok(())
}
133 changes: 0 additions & 133 deletions datafusion-examples/examples/simplify_udwf_expression.rs

This file was deleted.

2 changes: 1 addition & 1 deletion datafusion/expr/src/udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
/// optimizations manually for specific UDFs.
///
/// Example:
/// [`simplify_udwf_expression.rs`]: <https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/simplify_udwf_expression.rs>
/// [`advanced_udwf.rs`]: <https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udwf.rs>
///
/// # Returns
/// [None] if simplify is not defined or,
Expand Down
Loading