Skip to content

Commit 4246082

Browse files
committed
move accumulator and columnar value
1 parent 4b68273 commit 4246082

File tree

5 files changed

+110
-62
lines changed

5 files changed

+110
-62
lines changed

datafusion-expr/src/accumulator.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::ArrayRef;
19+
use datafusion_common::{Result, ScalarValue};
20+
use std::fmt::Debug;
21+
22+
/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
23+
/// generically accumulates values.
24+
///
25+
/// An accumulator knows how to:
26+
/// * update its state from inputs via `update_batch`
27+
/// * convert its internal state to a vector of scalar values
28+
/// * update its state from multiple accumulators' states via `merge_batch`
29+
/// * compute the final value from its internal state via `evaluate`
30+
pub trait Accumulator: Send + Sync + Debug {
31+
/// Returns the state of the accumulator at the end of the accumulation.
32+
// in the case of an average on which we track `sum` and `n`, this function should return a vector
33+
// of two values, sum and n.
34+
fn state(&self) -> Result<Vec<ScalarValue>>;
35+
36+
/// updates the accumulator's state from a vector of arrays.
37+
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
38+
39+
/// updates the accumulator's state from a vector of states.
40+
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
41+
42+
/// returns its value based on its current state.
43+
fn evaluate(&self) -> Result<ScalarValue>;
44+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::ArrayRef;
19+
use arrow::array::NullArray;
20+
use arrow::datatypes::DataType;
21+
use arrow::record_batch::RecordBatch;
22+
use datafusion_common::ScalarValue;
23+
use std::sync::Arc;
24+
25+
/// Represents the result from an expression
26+
#[derive(Clone)]
27+
pub enum ColumnarValue {
28+
/// Array of values
29+
Array(ArrayRef),
30+
/// A single value
31+
Scalar(ScalarValue),
32+
}
33+
34+
impl ColumnarValue {
35+
pub fn data_type(&self) -> DataType {
36+
match self {
37+
ColumnarValue::Array(array_value) => array_value.data_type().clone(),
38+
ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
39+
}
40+
}
41+
42+
/// Convert a columnar value into an ArrayRef
43+
pub fn into_array(self, num_rows: usize) -> ArrayRef {
44+
match self {
45+
ColumnarValue::Array(array) => array,
46+
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
47+
}
48+
}
49+
}
50+
51+
/// null columnar values are implemented as a null array in order to pass batch
52+
/// num_rows
53+
pub type NullColumnarValue = ColumnarValue;
54+
55+
impl From<&RecordBatch> for NullColumnarValue {
56+
fn from(batch: &RecordBatch) -> Self {
57+
let num_rows = batch.num_rows();
58+
ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
59+
}
60+
}

datafusion-expr/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
mod accumulator;
1819
mod aggregate_function;
1920
mod built_in_function;
2021
mod operator;
2122
mod signature;
2223
mod window_frame;
2324
mod window_function;
2425

26+
pub use accumulator::Accumulator;
2527
pub use aggregate_function::AggregateFunction;
2628
pub use built_in_function::BuiltinScalarFunction;
2729
pub use operator::Operator;

datafusion/src/physical_plan/functions.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,13 @@ use crate::{
4646
scalar::ScalarValue,
4747
};
4848
use arrow::{
49-
array::{ArrayRef, NullArray},
49+
array::ArrayRef,
5050
compute::kernels::length::{bit_length, length},
5151
datatypes::TimeUnit,
5252
datatypes::{DataType, Field, Int32Type, Int64Type, Schema},
5353
record_batch::RecordBatch,
5454
};
55+
pub use datafusion_expr::NullColumnarValue;
5556
use fmt::{Debug, Formatter};
5657
use std::convert::From;
5758
use std::{any::Any, fmt, sync::Arc};
@@ -1206,17 +1207,6 @@ impl fmt::Display for ScalarFunctionExpr {
12061207
}
12071208
}
12081209

1209-
/// null columnar values are implemented as a null array in order to pass batch
1210-
/// num_rows
1211-
type NullColumnarValue = ColumnarValue;
1212-
1213-
impl From<&RecordBatch> for NullColumnarValue {
1214-
fn from(batch: &RecordBatch) -> Self {
1215-
let num_rows = batch.num_rows();
1216-
ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
1217-
}
1218-
}
1219-
12201210
impl PhysicalExpr for ScalarFunctionExpr {
12211211
/// Return a reference to Any that can be used for downcasting
12221212
fn as_any(&self) -> &dyn Any {

datafusion/src/physical_plan/mod.rs

Lines changed: 2 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ use arrow::error::Result as ArrowResult;
3535
use arrow::record_batch::RecordBatch;
3636
use arrow::{array::ArrayRef, datatypes::Field};
3737
use async_trait::async_trait;
38+
pub use datafusion_expr::Accumulator;
39+
pub use datafusion_expr::ColumnarValue;
3840
pub use display::DisplayFormatType;
3941
use futures::stream::Stream;
4042
use std::fmt;
@@ -419,32 +421,6 @@ pub enum Distribution {
419421
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
420422
}
421423

422-
/// Represents the result from an expression
423-
#[derive(Clone)]
424-
pub enum ColumnarValue {
425-
/// Array of values
426-
Array(ArrayRef),
427-
/// A single value
428-
Scalar(ScalarValue),
429-
}
430-
431-
impl ColumnarValue {
432-
fn data_type(&self) -> DataType {
433-
match self {
434-
ColumnarValue::Array(array_value) => array_value.data_type().clone(),
435-
ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
436-
}
437-
}
438-
439-
/// Convert a columnar value into an ArrayRef
440-
pub fn into_array(self, num_rows: usize) -> ArrayRef {
441-
match self {
442-
ColumnarValue::Array(array) => array,
443-
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
444-
}
445-
}
446-
}
447-
448424
/// Expression that can be evaluated against a RecordBatch
449425
/// A Physical expression knows its type, nullability and how to evaluate itself.
450426
pub trait PhysicalExpr: Send + Sync + Display + Debug {
@@ -578,30 +554,6 @@ pub trait WindowExpr: Send + Sync + Debug {
578554
}
579555
}
580556

581-
/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
582-
/// generically accumulates values.
583-
///
584-
/// An accumulator knows how to:
585-
/// * update its state from inputs via `update_batch`
586-
/// * convert its internal state to a vector of scalar values
587-
/// * update its state from multiple accumulators' states via `merge_batch`
588-
/// * compute the final value from its internal state via `evaluate`
589-
pub trait Accumulator: Send + Sync + Debug {
590-
/// Returns the state of the accumulator at the end of the accumulation.
591-
// in the case of an average on which we track `sum` and `n`, this function should return a vector
592-
// of two values, sum and n.
593-
fn state(&self) -> Result<Vec<ScalarValue>>;
594-
595-
/// updates the accumulator's state from a vector of arrays.
596-
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
597-
598-
/// updates the accumulator's state from a vector of states.
599-
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
600-
601-
/// returns its value based on its current state.
602-
fn evaluate(&self) -> Result<ScalarValue>;
603-
}
604-
605557
/// Applies an optional projection to a [`SchemaRef`], returning the
606558
/// projected schema
607559
///

0 commit comments

Comments
 (0)