Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions datafusion/expr/src/window_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion_common::{
};

/// Holds the state of evaluating a window function
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct WindowAggState {
/// The range that we calculate the window function
pub window_frame_range: Range<usize>,
Expand Down Expand Up @@ -112,7 +112,7 @@ impl WindowAggState {
}

/// This object stores the window frame state for use in incremental calculations.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum WindowFrameContext {
/// ROWS frames are inherently stateless.
Rows(Arc<WindowFrame>),
Expand Down Expand Up @@ -240,7 +240,7 @@ impl WindowFrameContext {
}

/// State for each unique partition determined according to PARTITION BY column(s)
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub struct PartitionBatchState {
/// The record batch belonging to current partition
pub record_batch: RecordBatch,
Expand Down Expand Up @@ -282,7 +282,7 @@ impl PartitionBatchState {
/// ranges of data while processing RANGE frames.
/// Attribute `sort_options` stores the column ordering specified by the ORDER
/// BY clause. This information is used to calculate the range.
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct WindowFrameStateRange {
sort_options: Vec<SortOptions>,
}
Expand Down Expand Up @@ -454,7 +454,7 @@ impl WindowFrameStateRange {

/// This structure encapsulates all the state information we require as we
/// scan groups of data while processing window frames.
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct WindowFrameStateGroups {
/// A tuple containing group values and the row index where the group ends.
/// Example: [[1, 1], [1, 1], [2, 1], [2, 1], ...] would correspond to
Expand Down
38 changes: 35 additions & 3 deletions datafusion/physical-expr/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,10 +616,42 @@ impl AggregateFunctionExpr {
/// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, otherwise returns `None`.
pub fn with_new_expressions(
&self,
_args: Vec<Arc<dyn PhysicalExpr>>,
_order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
args: Vec<Arc<dyn PhysicalExpr>>,
order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
) -> Option<AggregateFunctionExpr> {
None
if args.len() != self.args.len()
|| (self.order_sensitivity() != AggregateOrderSensitivity::Insensitive
&& order_by_exprs.len() != self.order_bys.len())
{
return None;
}

let new_order_bys = self
.order_bys
.iter()
.zip(order_by_exprs)
.map(|(req, new_expr)| PhysicalSortExpr {
expr: new_expr,
options: req.options,
})
.collect();

Some(AggregateFunctionExpr {
fun: self.fun.clone(),
args,
return_field: Arc::clone(&self.return_field),
name: self.name.clone(),
// TODO: Human name should be updated after re-write to not mislead
human_display: self.human_display.clone(),
schema: self.schema.clone(),
order_bys: new_order_bys,
ignore_nulls: self.ignore_nulls,
ordering_fields: self.ordering_fields.clone(),
is_distinct: self.is_distinct,
is_reversed: false,
input_fields: self.input_fields.clone(),
is_nullable: self.is_nullable,
})
}

/// If this function is max, return (output_field, true)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/expressions/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion_expr_common::interval_arithmetic::Interval;
use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties};

/// Represents a literal value
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Literal {
value: ScalarValue,
field: FieldRef,
Expand Down
6 changes: 5 additions & 1 deletion datafusion/physical-expr/src/window/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;

use crate::aggregate::AggregateFunctionExpr;
use crate::window::standard::add_new_ordering_expr_with_partition_by;
use crate::window::window_expr::AggregateWindowExpr;
use crate::window::window_expr::{AggregateWindowExpr, WindowFn};
use crate::window::{
PartitionBatches, PartitionWindowAggStates, SlidingAggregateWindowExpr, WindowExpr,
};
Expand Down Expand Up @@ -211,6 +211,10 @@ impl WindowExpr for PlainAggregateWindowExpr {
fn uses_bounded_memory(&self) -> bool {
!self.window_frame.end_bound.is_unbounded()
}

fn create_window_fn(&self) -> Result<WindowFn> {
Ok(WindowFn::Aggregate(self.get_accumulator()?))
}
}

impl AggregateWindowExpr for PlainAggregateWindowExpr {
Expand Down
6 changes: 5 additions & 1 deletion datafusion/physical-expr/src/window/sliding_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::ops::Range;
use std::sync::Arc;

use crate::aggregate::AggregateFunctionExpr;
use crate::window::window_expr::AggregateWindowExpr;
use crate::window::window_expr::{AggregateWindowExpr, WindowFn};
use crate::window::{
PartitionBatches, PartitionWindowAggStates, PlainAggregateWindowExpr, WindowExpr,
};
Expand Down Expand Up @@ -175,6 +175,10 @@ impl WindowExpr for SlidingAggregateWindowExpr {
window_frame: Arc::clone(&self.window_frame),
}))
}

fn create_window_fn(&self) -> Result<WindowFn> {
Ok(WindowFn::Aggregate(self.get_accumulator()?))
}
}

impl AggregateWindowExpr for SlidingAggregateWindowExpr {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/src/window/standard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ impl WindowExpr for StandardWindowExpr {
false
}
}

fn create_window_fn(&self) -> Result<WindowFn> {
Ok(WindowFn::Builtin(self.expr.create_evaluator()?))
}
}

/// Adds a new ordering expression into existing ordering equivalence class(es) based on
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-expr/src/window/window_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ pub trait WindowExpr: Send + Sync + Debug {
/// Get the reverse expression of this [WindowExpr].
fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>>;

/// Creates a new instance of the window function evaluator.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a new API too, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this util becomes useful when updating WindowAggState's from deserialized window_fn states. If it seems too specific, I can remove it

///
/// Returns `WindowFn::Builtin` for built-in window functions (e.g., ROW_NUMBER, RANK)
/// or `WindowFn::Aggregate` for aggregate window functions (e.g., SUM, AVG).
fn create_window_fn(&self) -> Result<WindowFn>;

/// Returns all expressions used in the [`WindowExpr`].
/// These expressions are (1) function arguments, (2) partition by expressions, (3) order by expressions.
fn all_expressions(&self) -> WindowPhysicalExpressions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use std::mem::size_of;

use crate::aggregates::group_values::GroupValues;

use arrow::array::{Array, ArrayRef, OffsetSizeTrait, RecordBatch};
use datafusion_common::Result;
use datafusion_expr::EmitTo;
use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType};
use std::mem::size_of;

/// A [`GroupValues`] storing single column of Utf8/LargeUtf8/Binary/LargeBinary values
///
Expand All @@ -42,11 +45,7 @@ impl<O: OffsetSizeTrait> GroupValuesByes<O> {
}

impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
fn intern(
&mut self,
cols: &[ArrayRef],
groups: &mut Vec<usize>,
) -> datafusion_common::Result<()> {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
assert_eq!(cols.len(), 1);

// look up / add entries in the table
Expand Down Expand Up @@ -85,7 +84,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
self.num_groups
}

fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
// Reset the map to default, and convert it into a single array
let map_contents = self.map.take().into_state();

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl ExecutionPlan for TestMemoryExec {
}

fn as_any(&self) -> &dyn Any {
unimplemented!()
self
}

fn properties(&self) -> &PlanProperties {
Expand Down