Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move PartitionEvaluator and window_state structures to datafusion_expr crate #6690

Merged
merged 5 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use arrow::{
datatypes::{Schema, SchemaBuilder, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_expr::window_state::{PartitionBatchState, WindowAggState};
use futures::stream::Stream;
use futures::{ready, StreamExt};
use hashbrown::raw::RawTable;
Expand All @@ -62,8 +63,7 @@ use datafusion_common::DataFusionError;
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::hash_utils::create_hashes;
use datafusion_physical_expr::window::{
PartitionBatchState, PartitionBatches, PartitionKey, PartitionWindowAggStates,
WindowAggState, WindowState,
PartitionBatches, PartitionKey, PartitionWindowAggStates, WindowState,
};
use datafusion_physical_expr::{
EquivalenceProperties, OrderingEquivalenceProperties, PhysicalExpr,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod literal;
pub mod logical_plan;
mod nullif;
mod operator;
mod partition_evaluator;
mod signature;
pub mod struct_expressions;
mod table_source;
Expand All @@ -51,6 +52,7 @@ mod udf;
pub mod utils;
pub mod window_frame;
pub mod window_function;
pub mod window_state;

pub use accumulator::Accumulator;
pub use aggregate_function::AggregateFunction;
Expand All @@ -69,6 +71,7 @@ pub use literal::{lit, lit_timestamp_nano, Literal, TimestampLiteral};
pub use logical_plan::*;
pub use nullif::SUPPORTED_NULLIF_TYPES;
pub use operator::Operator;
pub use partition_evaluator::PartitionEvaluator;
pub use signature::{Signature, TypeSignature, Volatility};
pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
pub use udaf::AggregateUDF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@

//! Partition evaluation module

use crate::window::WindowAggState;
use arrow::array::ArrayRef;
use datafusion_common::Result;
use datafusion_common::{DataFusionError, ScalarValue};
use std::fmt::Debug;
use std::ops::Range;

use crate::window_state::WindowAggState;

/// Partition evaluator for Window Functions
///
/// # Background
///
/// An implementation of this trait is created and used for each
/// partition defined by an `OVER` clause and is instantiated by
/// [`BuiltInWindowFunctionExpr::create_evaluator`]
/// the DataFusion runtime.
///
/// For example, evaluating `window_func(val) OVER (PARTITION BY col)`
/// on the following data:
Expand Down Expand Up @@ -65,7 +66,8 @@ use std::ops::Range;
/// ```
///
/// Different methods on this trait will be called depending on the
/// capabilities described by [`BuiltInWindowFunctionExpr`]:
/// capabilities described by [`Self::supports_bounded_execution`],
/// [`Self::uses_window_frame`], and [`Self::include_rank`],
///
/// # Stateless `PartitionEvaluator`
///
Expand Down Expand Up @@ -95,9 +97,6 @@ use std::ops::Range;
/// |false|true|`evaluate` (optionally can also implement `evaluate_all` for more optimized implementation. However, there will be default implementation that is suboptimal) . If we were to implement `ROW_NUMBER` it will end up in this quadrant. Example `OddRowNumber` showcases this use case|
/// |true|false|`evaluate` (I think as long as `uses_window_frame` is `true`. There is no way for `supports_bounded_execution` to be false). I couldn't come up with any example for this quadrant |
/// |true|true|`evaluate`. If we were to implement `FIRST_VALUE`, it would end up in this quadrant|.
///
/// [`BuiltInWindowFunctionExpr`]: crate::window::BuiltInWindowFunctionExpr
/// [`BuiltInWindowFunctionExpr::create_evaluator`]: crate::window::BuiltInWindowFunctionExpr::create_evaluator
pub trait PartitionEvaluator: Debug + Send {
/// Updates the internal state for window function
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,100 @@
// specific language governing permissions and limitations
// under the License.

//! This module provides utilities for window frame index calculations
//! depending on the window frame mode: RANGE, ROWS, GROUPS.

use arrow::array::ArrayRef;
use arrow::compute::kernels::sort::SortOptions;
use datafusion_common::utils::{compare_rows, get_row_at_idx, search_in_slice};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
use std::cmp::min;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
//! Structures used to hold window function state (for implementing WindowUDFs)

use std::{collections::VecDeque, ops::Range, sync::Arc};

use arrow::{
array::ArrayRef,
compute::{concat, SortOptions},
datatypes::DataType,
record_batch::RecordBatch,
};
use datafusion_common::{
utils::{compare_rows, get_row_at_idx, search_in_slice},
DataFusionError, Result, ScalarValue,
};

use crate::{WindowFrame, WindowFrameBound, WindowFrameUnits};

/// Holds the state of evaluating a window function
#[derive(Debug)]
pub struct WindowAggState {
/// The range that we calculate the window function
pub window_frame_range: Range<usize>,
pub window_frame_ctx: Option<WindowFrameContext>,
/// The index of the last row that its result is calculated inside the partition record batch buffer.
pub last_calculated_index: usize,
/// The offset of the deleted row number
pub offset_pruned_rows: usize,
/// Stores the results calculated by window frame
pub out_col: ArrayRef,
/// Keeps track of how many rows should be generated to be in sync with input record_batch.
// (For each row in the input record batch we need to generate a window result).
pub n_row_result_missing: usize,
/// flag indicating whether we have received all data for this partition
pub is_end: bool,
}

impl WindowAggState {
pub fn prune_state(&mut self, n_prune: usize) {
self.window_frame_range = Range {
start: self.window_frame_range.start - n_prune,
end: self.window_frame_range.end - n_prune,
};
self.last_calculated_index -= n_prune;
self.offset_pruned_rows += n_prune;

match self.window_frame_ctx.as_mut() {
// Rows have no state do nothing
Some(WindowFrameContext::Rows(_)) => {}
Some(WindowFrameContext::Range { .. }) => {}
Some(WindowFrameContext::Groups { state, .. }) => {
let mut n_group_to_del = 0;
for (_, end_idx) in &state.group_end_indices {
if n_prune < *end_idx {
break;
}
n_group_to_del += 1;
}
state.group_end_indices.drain(0..n_group_to_del);
state
.group_end_indices
.iter_mut()
.for_each(|(_, start_idx)| *start_idx -= n_prune);
state.current_group_idx -= n_group_to_del;
}
None => {}
};
}

pub fn update(
&mut self,
out_col: &ArrayRef,
partition_batch_state: &PartitionBatchState,
) -> Result<()> {
self.last_calculated_index += out_col.len();
self.out_col = concat(&[&self.out_col, &out_col])?;
self.n_row_result_missing =
partition_batch_state.record_batch.num_rows() - self.last_calculated_index;
self.is_end = partition_batch_state.is_end;
Ok(())
}

pub fn new(out_type: &DataType) -> Result<Self> {
let empty_out_col = ScalarValue::try_from(out_type)?.to_array_of_size(0);
Ok(Self {
window_frame_range: Range { start: 0, end: 0 },
window_frame_ctx: None,
last_calculated_index: 0,
offset_pruned_rows: 0,
out_col: empty_out_col,
n_row_result_missing: 0,
is_end: false,
})
}
}

/// This object stores the window frame state for use in incremental calculations.
#[derive(Debug)]
Expand Down Expand Up @@ -125,7 +206,7 @@ impl WindowFrameContext {
)))
}
WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
min(idx + n as usize, length)
std::cmp::min(idx + n as usize, length)
}
// ERRONEOUS FRAMES
WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
Expand All @@ -150,7 +231,7 @@ impl WindowFrameContext {
// UNBOUNDED FOLLOWING
WindowFrameBound::Following(ScalarValue::UInt64(None)) => length,
WindowFrameBound::Following(ScalarValue::UInt64(Some(n))) => {
min(idx + n as usize + 1, length)
std::cmp::min(idx + n as usize + 1, length)
}
// ERRONEOUS FRAMES
WindowFrameBound::Preceding(_) | WindowFrameBound::Following(_) => {
Expand All @@ -161,6 +242,17 @@ impl WindowFrameContext {
}
}

/// State for each unique partition determined according to PARTITION BY column(s)
#[derive(Debug)]
pub struct PartitionBatchState {
/// The record_batch belonging to current partition
pub record_batch: RecordBatch,
/// Flag indicating whether we have received all data for this partition
pub is_end: bool,
/// Number of rows emitted for each partition
pub n_out_row: usize,
}

/// This structure encapsulates all the state information we require as we scan
/// ranges of data while processing RANGE frames.
/// Attribute `sort_options` stores the column ordering specified by the ORDER
Expand Down Expand Up @@ -510,7 +602,7 @@ impl WindowFrameStateGroups {
Ok(match (SIDE, SEARCH_SIDE) {
// Window frame start:
(true, _) => {
let group_idx = min(group_idx, self.group_end_indices.len());
let group_idx = std::cmp::min(group_idx, self.group_end_indices.len());
if group_idx > 0 {
// Normally, start at the boundary of the previous group.
self.group_end_indices[group_idx - 1].1
Expand All @@ -531,7 +623,7 @@ impl WindowFrameStateGroups {
}
// Window frame end, FOLLOWING n
(false, false) => {
let group_idx = min(
let group_idx = std::cmp::min(
self.current_group_idx + delta,
self.group_end_indices.len() - 1,
);
Expand All @@ -547,11 +639,10 @@ fn check_equality(current: &[ScalarValue], target: &[ScalarValue]) -> Result<boo

#[cfg(test)]
mod tests {
use crate::window::window_frame_state::WindowFrameStateGroups;
use super::*;
use crate::{WindowFrame, WindowFrameBound, WindowFrameUnits};
use arrow::array::{ArrayRef, Float64Array};
use arrow_schema::SortOptions;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
use std::ops::Range;
use std::sync::Arc;

Expand Down
7 changes: 3 additions & 4 deletions datafusion/physical-expr/src/window/built_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,19 @@ use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

use super::window_frame_state::WindowFrameContext;
use super::BuiltInWindowFunctionExpr;
use super::WindowExpr;
use crate::window::window_expr::WindowFn;
use crate::window::{
PartitionBatches, PartitionWindowAggStates, WindowAggState, WindowState,
};
use crate::window::{PartitionBatches, PartitionWindowAggStates, WindowState};
use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr};
use arrow::array::{new_empty_array, ArrayRef};
use arrow::compute::SortOptions;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
use datafusion_common::utils::evaluate_partition_ranges;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::window_state::WindowAggState;
use datafusion_expr::window_state::WindowFrameContext;
use datafusion_expr::WindowFrame;

/// A window expr that takes the form of a [`BuiltInWindowFunctionExpr`].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use super::partition_evaluator::PartitionEvaluator;
use crate::equivalence::OrderingEquivalenceBuilder;
use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_expr::PartitionEvaluator;
use std::any::Any;
use std::sync::Arc;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/window/cume_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
//! Defines physical expression for `cume_dist` that can evaluated
//! at runtime during query execution

use crate::window::partition_evaluator::PartitionEvaluator;
use crate::window::BuiltInWindowFunctionExpr;
use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::array::Float64Array;
use arrow::datatypes::{DataType, Field};
use datafusion_common::Result;
use datafusion_expr::PartitionEvaluator;
use std::any::Any;
use std::iter;
use std::ops::Range;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/physical-expr/src/window/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
//! Defines physical expression for `lead` and `lag` that can evaluated
//! at runtime during query execution

use crate::window::partition_evaluator::PartitionEvaluator;
use crate::window::window_expr::LeadLagState;
use crate::window::{BuiltInWindowFunctionExpr, WindowAggState};
use crate::window::BuiltInWindowFunctionExpr;
use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::compute::cast;
use arrow::datatypes::{DataType, Field};
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::window_state::WindowAggState;
use datafusion_expr::PartitionEvaluator;
use std::any::Any;
use std::cmp::min;
use std::ops::{Neg, Range};
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-expr/src/window/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,17 @@ pub(crate) mod cume_dist;
pub(crate) mod lead_lag;
pub(crate) mod nth_value;
pub(crate) mod ntile;
pub(crate) mod partition_evaluator;
pub(crate) mod rank;
pub(crate) mod row_number;
mod sliding_aggregate;
mod window_expr;
mod window_frame_state;

pub use aggregate::PlainAggregateWindowExpr;
pub use built_in::BuiltInWindowExpr;
pub use built_in_window_function_expr::BuiltInWindowFunctionExpr;
pub use sliding_aggregate::SlidingAggregateWindowExpr;
pub use window_expr::PartitionBatchState;
pub use window_expr::PartitionBatches;
pub use window_expr::PartitionKey;
pub use window_expr::PartitionWindowAggStates;
pub use window_expr::WindowAggState;
pub use window_expr::WindowExpr;
pub use window_expr::WindowState;
5 changes: 3 additions & 2 deletions datafusion/physical-expr/src/window/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
//! Defines physical expressions for `first_value`, `last_value`, and `nth_value`
//! that can evaluated at runtime during query execution

use crate::window::partition_evaluator::PartitionEvaluator;
use crate::window::window_expr::{NthValueKind, NthValueState};
use crate::window::{BuiltInWindowFunctionExpr, WindowAggState};
use crate::window::BuiltInWindowFunctionExpr;
use crate::PhysicalExpr;
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, Field};
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::window_state::WindowAggState;
use datafusion_expr::PartitionEvaluator;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/window/ntile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
//! Defines physical expression for `ntile` that can evaluated
//! at runtime during query execution

use crate::window::partition_evaluator::PartitionEvaluator;
use crate::window::BuiltInWindowFunctionExpr;
use crate::PhysicalExpr;
use arrow::array::{ArrayRef, UInt64Array};
use arrow::datatypes::Field;
use arrow_schema::DataType;
use datafusion_common::Result;
use datafusion_expr::PartitionEvaluator;
use std::any::Any;
use std::sync::Arc;

Expand Down
Loading