diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index f430a87e190d..e2432abdc138 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -62,6 +62,7 @@ dashmap = { workspace = true } datafusion = { workspace = true, default-features = true, features = ["avro"] } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } +datafusion-functions-window-common = { workspace = true } datafusion-optimizer = { workspace = true, default-features = true } datafusion-physical-expr = { workspace = true, default-features = true } datafusion-proto = { workspace = true } diff --git a/datafusion-examples/examples/advanced_udwf.rs b/datafusion-examples/examples/advanced_udwf.rs index fd1b84070cf6..1c20e292f091 100644 --- a/datafusion-examples/examples/advanced_udwf.rs +++ b/datafusion-examples/examples/advanced_udwf.rs @@ -30,6 +30,7 @@ use datafusion_expr::function::WindowUDFFieldArgs; use datafusion_expr::{ PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl, }; +use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; /// This example shows how to use the full WindowUDFImpl API to implement a user /// defined window function. As in the `simple_udwf.rs` example, this struct implements @@ -74,7 +75,10 @@ impl WindowUDFImpl for SmoothItUdf { /// Create a `PartitionEvaluator` to evaluate this function on a new /// partition. - fn partition_evaluator(&self) -> Result> { + fn partition_evaluator( + &self, + _partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result> { Ok(Box::new(MyPartitionEvaluator::new())) } diff --git a/datafusion-examples/examples/simplify_udwf_expression.rs b/datafusion-examples/examples/simplify_udwf_expression.rs index 1ff629eef196..d95f1147bc37 100644 --- a/datafusion-examples/examples/simplify_udwf_expression.rs +++ b/datafusion-examples/examples/simplify_udwf_expression.rs @@ -27,6 +27,7 @@ use datafusion_expr::{ expr::WindowFunction, simplify::SimplifyInfo, Expr, PartitionEvaluator, Signature, Volatility, WindowUDF, WindowUDFImpl, }; +use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; /// This UDWF will show how to use the WindowUDFImpl::simplify() API #[derive(Debug, Clone)] @@ -60,7 +61,10 @@ impl WindowUDFImpl for SimplifySmoothItUdf { &self.signature } - fn partition_evaluator(&self) -> Result> { + fn partition_evaluator( + &self, + _partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result> { todo!() } diff --git a/datafusion/core/tests/user_defined/user_defined_window_functions.rs b/datafusion/core/tests/user_defined/user_defined_window_functions.rs index d96bb23953ae..3760328934bc 100644 --- a/datafusion/core/tests/user_defined/user_defined_window_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_window_functions.rs @@ -36,6 +36,7 @@ use datafusion_expr::{ PartitionEvaluator, Signature, Volatility, WindowUDF, WindowUDFImpl, }; use datafusion_functions_window_common::field::WindowUDFFieldArgs; +use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; /// A query with a window function evaluated over the entire partition const UNBOUNDED_WINDOW_QUERY: &str = "SELECT x, y, val, \ @@ -552,7 +553,10 @@ impl OddCounter { &self.signature } - fn partition_evaluator(&self) -> Result> { + fn partition_evaluator( + &self, + _partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result> { Ok(Box::new(OddCounter::new(Arc::clone(&self.test_state)))) } diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 2975e36488dc..ea053b9fb195 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -27,8 +27,8 @@ use crate::function::{ }; use crate::{ conditional_expressions::CaseBuilder, expr::Sort, logical_plan::Subquery, - AggregateUDF, Expr, LogicalPlan, Operator, ScalarFunctionImplementation, ScalarUDF, - Signature, Volatility, + AggregateUDF, Expr, LogicalPlan, Operator, PartitionEvaluator, + ScalarFunctionImplementation, ScalarUDF, Signature, Volatility, }; use crate::{ AggregateUDFImpl, ColumnarValue, ScalarUDFImpl, WindowFrame, WindowUDF, WindowUDFImpl, @@ -39,6 +39,7 @@ use arrow::compute::kernels::cast_utils::{ use arrow::datatypes::{DataType, Field}; use datafusion_common::{plan_err, Column, Result, ScalarValue, TableReference}; use datafusion_functions_window_common::field::WindowUDFFieldArgs; +use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use sqlparser::ast::NullTreatment; use std::any::Any; use std::fmt::Debug; @@ -658,7 +659,10 @@ impl WindowUDFImpl for SimpleWindowUDF { &self.signature } - fn partition_evaluator(&self) -> Result> { + fn partition_evaluator( + &self, + _partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result> { (self.partition_evaluator_factory)() } diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs index 6459e8f3f7d1..33201f42294a 100644 --- a/datafusion/expr/src/udwf.rs +++ b/datafusion/expr/src/udwf.rs @@ -28,14 +28,14 @@ use std::{ use arrow::datatypes::{DataType, Field}; -use datafusion_common::{not_impl_err, Result}; -use datafusion_functions_window_common::field::WindowUDFFieldArgs; - use crate::expr::WindowFunction; use crate::{ function::WindowFunctionSimplification, Documentation, Expr, PartitionEvaluator, Signature, }; +use datafusion_common::{not_impl_err, Result}; +use datafusion_functions_window_common::field::WindowUDFFieldArgs; +use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; /// Logical representation of a user-defined window function (UDWF) /// A UDWF is different from a UDF in that it is stateful across batches. @@ -150,8 +150,11 @@ impl WindowUDF { } /// Return a `PartitionEvaluator` for evaluating this window function - pub fn partition_evaluator_factory(&self) -> Result> { - self.inner.partition_evaluator() + pub fn partition_evaluator_factory( + &self, + partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result> { + self.inner.partition_evaluator(partition_evaluator_args) } /// Returns the field of the final result of evaluating this window function. @@ -218,8 +221,9 @@ where /// # use datafusion_common::{DataFusionError, plan_err, Result}; /// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator, WindowFrame, ExprFunctionExt, Documentation}; /// # use datafusion_expr::{WindowUDFImpl, WindowUDF}; -/// # use datafusion_expr::window_doc_sections::DOC_SECTION_ANALYTICAL; /// # use datafusion_functions_window_common::field::WindowUDFFieldArgs; +/// # use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; +/// # use datafusion_expr::window_doc_sections::DOC_SECTION_ANALYTICAL; /// /// #[derive(Debug, Clone)] /// struct SmoothIt { @@ -254,7 +258,12 @@ where /// fn name(&self) -> &str { "smooth_it" } /// fn signature(&self) -> &Signature { &self.signature } /// // The actual implementation would smooth the window -/// fn partition_evaluator(&self) -> Result> { unimplemented!() } +/// fn partition_evaluator( +/// &self, +/// _partition_evaluator_args: PartitionEvaluatorArgs, +/// ) -> Result> { +/// unimplemented!() +/// } /// fn field(&self, field_args: WindowUDFFieldArgs) -> Result { /// if let Some(DataType::Int32) = field_args.get_input_type(0) { /// Ok(Field::new(field_args.name(), DataType::Int32, false)) @@ -294,7 +303,10 @@ pub trait WindowUDFImpl: Debug + Send + Sync { fn signature(&self) -> &Signature; /// Invoke the function, returning the [`PartitionEvaluator`] instance - fn partition_evaluator(&self) -> Result>; + fn partition_evaluator( + &self, + partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result>; /// Returns any aliases (alternate names) for this function. /// @@ -464,8 +476,11 @@ impl WindowUDFImpl for AliasedWindowUDFImpl { self.inner.signature() } - fn partition_evaluator(&self) -> Result> { - self.inner.partition_evaluator() + fn partition_evaluator( + &self, + partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result> { + self.inner.partition_evaluator(partition_evaluator_args) } fn aliases(&self) -> &[String] { @@ -546,6 +561,7 @@ mod test { use datafusion_common::Result; use datafusion_expr_common::signature::{Signature, Volatility}; use datafusion_functions_window_common::field::WindowUDFFieldArgs; + use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use std::any::Any; use std::cmp::Ordering; @@ -577,7 +593,10 @@ mod test { fn signature(&self) -> &Signature { &self.signature } - fn partition_evaluator(&self) -> Result> { + fn partition_evaluator( + &self, + _partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result> { unimplemented!() } fn field(&self, _field_args: WindowUDFFieldArgs) -> Result { @@ -613,7 +632,10 @@ mod test { fn signature(&self) -> &Signature { &self.signature } - fn partition_evaluator(&self) -> Result> { + fn partition_evaluator( + &self, + _partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result> { unimplemented!() } fn field(&self, _field_args: WindowUDFFieldArgs) -> Result { diff --git a/datafusion/functions-window-common/Cargo.toml b/datafusion/functions-window-common/Cargo.toml index 98b6f8c6dba5..b5df212b7d2a 100644 --- a/datafusion/functions-window-common/Cargo.toml +++ b/datafusion/functions-window-common/Cargo.toml @@ -39,3 +39,4 @@ path = "src/lib.rs" [dependencies] datafusion-common = { workspace = true } +datafusion-physical-expr-common = { workspace = true } diff --git a/datafusion/functions-window-common/src/lib.rs b/datafusion/functions-window-common/src/lib.rs index 2e4bcbbc83b9..53f9eb1c9ac6 100644 --- a/datafusion/functions-window-common/src/lib.rs +++ b/datafusion/functions-window-common/src/lib.rs @@ -19,3 +19,4 @@ //! //! [DataFusion]: pub mod field; +pub mod partition; diff --git a/datafusion/functions-window-common/src/partition.rs b/datafusion/functions-window-common/src/partition.rs new file mode 100644 index 000000000000..715c16b87618 --- /dev/null +++ b/datafusion/functions-window-common/src/partition.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::arrow::datatypes::DataType; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use std::sync::Arc; + +pub struct PartitionEvaluatorArgs<'a> { + input_exprs: &'a [Arc], + input_types: &'a [DataType], + is_reversed: bool, + ignore_nulls: bool, +} + +impl<'a> PartitionEvaluatorArgs<'a> { + pub fn new( + input_exprs: &'a [Arc], + input_types: &'a [DataType], + is_reversed: bool, + ignore_nulls: bool, + ) -> Self { + Self { + input_exprs, + input_types, + is_reversed, + ignore_nulls, + } + } + + pub fn input_expr_at(&self, index: usize) -> Option<&Arc> { + self.input_exprs.get(index) + } + + pub fn input_types_at(&self, index: usize) -> Option<&DataType> { + self.input_types.get(index) + } + + pub fn is_reversed(&self) -> bool { + self.is_reversed + } + + pub fn ignore_nulls(&self) -> bool { + self.ignore_nulls + } +} diff --git a/datafusion/functions-window/src/macros.rs b/datafusion/functions-window/src/macros.rs index 843d8ecb38cc..e934f883b101 100644 --- a/datafusion/functions-window/src/macros.rs +++ b/datafusion/functions-window/src/macros.rs @@ -45,6 +45,7 @@ /// # /// # use datafusion_functions_window_common::field::WindowUDFFieldArgs; /// # use datafusion_functions_window::get_or_init_udwf; +/// # use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; /// # /// /// Defines the `simple_udwf()` user-defined window function. /// get_or_init_udwf!( @@ -80,6 +81,7 @@ /// # } /// # fn partition_evaluator( /// # &self, +/// # _partition_evaluator_args: PartitionEvaluatorArgs, /// # ) -> datafusion_common::Result> { /// # unimplemented!() /// # } @@ -145,6 +147,8 @@ macro_rules! get_or_init_udwf { /// # use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDFImpl}; /// # use datafusion_functions_window::{create_udwf_expr, get_or_init_udwf}; /// # use datafusion_functions_window_common::field::WindowUDFFieldArgs; +/// # use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; +/// /// # get_or_init_udwf!( /// # RowNumber, /// # row_number, @@ -193,6 +197,7 @@ macro_rules! get_or_init_udwf { /// # } /// # fn partition_evaluator( /// # &self, +/// # _partition_evaluator_args: PartitionEvaluatorArgs, /// # ) -> datafusion_common::Result> { /// # unimplemented!() /// # } @@ -216,6 +221,7 @@ macro_rules! get_or_init_udwf { /// # use datafusion_common::arrow::datatypes::Field; /// # use datafusion_common::ScalarValue; /// # use datafusion_expr::{col, lit}; +/// # use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; /// # /// # get_or_init_udwf!(Lead, lead, "user-defined window function"); /// # @@ -278,6 +284,7 @@ macro_rules! get_or_init_udwf { /// # } /// # fn partition_evaluator( /// # &self, +/// # partition_evaluator_args: PartitionEvaluatorArgs, /// # ) -> datafusion_common::Result> { /// # unimplemented!() /// # } @@ -355,6 +362,7 @@ macro_rules! create_udwf_expr { /// # /// # use datafusion_functions_window_common::field::WindowUDFFieldArgs; /// # use datafusion_functions_window::{define_udwf_and_expr, get_or_init_udwf, create_udwf_expr}; +/// # use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; /// # /// /// 1. Defines the `simple_udwf()` user-defined window function. /// /// @@ -397,6 +405,7 @@ macro_rules! create_udwf_expr { /// # } /// # fn partition_evaluator( /// # &self, +/// # partition_evaluator_args: PartitionEvaluatorArgs, /// # ) -> datafusion_common::Result> { /// # unimplemented!() /// # } @@ -415,6 +424,7 @@ macro_rules! create_udwf_expr { /// # use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDFImpl}; /// # use datafusion_functions_window::{create_udwf_expr, define_udwf_and_expr, get_or_init_udwf}; /// # use datafusion_functions_window_common::field::WindowUDFFieldArgs; +/// # use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; /// # /// /// 1. Defines the `row_number_udwf()` user-defined window function. /// /// @@ -459,6 +469,7 @@ macro_rules! create_udwf_expr { /// # } /// # fn partition_evaluator( /// # &self, +/// # _partition_evaluator_args: PartitionEvaluatorArgs, /// # ) -> datafusion_common::Result> { /// # unimplemented!() /// # } @@ -484,6 +495,7 @@ macro_rules! create_udwf_expr { /// # use datafusion_common::arrow::datatypes::Field; /// # use datafusion_common::ScalarValue; /// # use datafusion_expr::{col, lit}; +/// # use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; /// # /// /// 1. Defines the `lead_udwf()` user-defined window function. /// /// @@ -543,6 +555,7 @@ macro_rules! create_udwf_expr { /// # } /// # fn partition_evaluator( /// # &self, +/// # _partition_evaluator_args: PartitionEvaluatorArgs, /// # ) -> datafusion_common::Result> { /// # unimplemented!() /// # } @@ -570,6 +583,7 @@ macro_rules! create_udwf_expr { /// # use datafusion_common::arrow::datatypes::Field; /// # use datafusion_common::ScalarValue; /// # use datafusion_expr::{col, lit}; +/// # use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; /// # /// /// 1. Defines the `lead_udwf()` user-defined window function. /// /// @@ -630,6 +644,7 @@ macro_rules! create_udwf_expr { /// # } /// # fn partition_evaluator( /// # &self, +/// # _partition_evaluator_args: PartitionEvaluatorArgs, /// # ) -> datafusion_common::Result> { /// # unimplemented!() /// # } diff --git a/datafusion/functions-window/src/row_number.rs b/datafusion/functions-window/src/row_number.rs index c903f6778ae8..abcee9672df1 100644 --- a/datafusion/functions-window/src/row_number.rs +++ b/datafusion/functions-window/src/row_number.rs @@ -28,6 +28,7 @@ use datafusion_expr::{ Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl, }; use datafusion_functions_window_common::field; +use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use field::WindowUDFFieldArgs; use std::any::Any; use std::fmt::Debug; @@ -89,7 +90,10 @@ impl WindowUDFImpl for RowNumber { &self.signature } - fn partition_evaluator(&self) -> Result> { + fn partition_evaluator( + &self, + _partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result> { Ok(Box::::default()) } diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index a78a54a57123..54526c91fdab 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -1789,6 +1789,8 @@ fn inlist_except(mut l1: InList, l2: &InList) -> Result { #[cfg(test)] mod tests { + use crate::simplify_expressions::SimplifyContext; + use crate::test::test_table_scan_with_name; use datafusion_common::{assert_contains, DFSchemaRef, ToDFSchema}; use datafusion_expr::{ function::{ @@ -1799,15 +1801,13 @@ mod tests { *, }; use datafusion_functions_window_common::field::WindowUDFFieldArgs; + use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use std::{ collections::HashMap, ops::{BitAnd, BitOr, BitXor}, sync::Arc, }; - use crate::simplify_expressions::SimplifyContext; - use crate::test::test_table_scan_with_name; - use super::*; // ------------------------------ @@ -3910,7 +3910,10 @@ mod tests { } } - fn partition_evaluator(&self) -> Result> { + fn partition_evaluator( + &self, + _partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result> { unimplemented!("not needed for tests") } diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 6aafaad0ad77..ff5085a6d9cd 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -52,6 +52,7 @@ mod window_agg_exec; pub use bounded_window_agg_exec::BoundedWindowAggExec; use datafusion_functions_window_common::field::WindowUDFFieldArgs; +use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use datafusion_physical_expr::expressions::Column; pub use datafusion_physical_expr::window::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, @@ -385,7 +386,13 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr { } fn create_evaluator(&self) -> Result> { - self.fun.partition_evaluator_factory() + self.fun + .partition_evaluator_factory(PartitionEvaluatorArgs::new( + &self.args, + &self.input_types, + self.is_reversed, + self.ignore_nulls, + )) } fn name(&self) -> &str { diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index cd789e06dc3b..46b1af3d3221 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -75,6 +75,7 @@ use datafusion_functions_aggregate::expr_fn::{ }; use datafusion_functions_aggregate::string_agg::string_agg; use datafusion_functions_window_common::field::WindowUDFFieldArgs; +use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use datafusion_proto::bytes::{ logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec, logical_plan_to_bytes, logical_plan_to_bytes_with_extension_codec, @@ -2459,7 +2460,10 @@ fn roundtrip_window() { &self.signature } - fn partition_evaluator(&self) -> Result> { + fn partition_evaluator( + &self, + _partition_evaluator_args: PartitionEvaluatorArgs, + ) -> Result> { make_partition_evaluator() }