Skip to content

Commit ad41b2a

Browse files
committed
move more expressions to datafusion-physical-expr
1 parent cca2012 commit ad41b2a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+411
-444
lines changed

datafusion-physical-expr/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,6 @@ path = "src/lib.rs"
3838
datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
3939
datafusion-expr = { path = "../datafusion-expr", version = "7.0.0" }
4040
arrow = { version = "9.0.0", features = ["prettyprint"] }
41+
paste = "^1.0"
42+
ahash = { version = "0.7", default-features = false }
43+
ordered-float = "2.10"

datafusion/src/physical_plan/coercion_rule/binary_rule.rs renamed to datafusion-physical-expr/src/coercion_rule/binary_rule.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
//! Coercion rules for matching argument types for binary operators
1919
20-
use crate::arrow::datatypes::DataType;
21-
use crate::error::{DataFusionError, Result};
22-
use crate::scalar::{MAX_PRECISION_FOR_DECIMAL128, MAX_SCALE_FOR_DECIMAL128};
20+
use arrow::datatypes::DataType;
21+
use datafusion_common::DataFusionError;
22+
use datafusion_common::Result;
23+
use datafusion_common::{MAX_PRECISION_FOR_DECIMAL128, MAX_SCALE_FOR_DECIMAL128};
2324
use datafusion_expr::Operator;
2425

2526
/// Coercion rules for all binary operators. Returns the output type
@@ -492,8 +493,9 @@ fn eq_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
492493
#[cfg(test)]
493494
mod tests {
494495
use super::*;
495-
use crate::arrow::datatypes::DataType;
496-
use crate::error::{DataFusionError, Result};
496+
use arrow::datatypes::DataType;
497+
use datafusion_common::DataFusionError;
498+
use datafusion_common::Result;
497499
use datafusion_expr::Operator;
498500

499501
#[test]

datafusion/src/physical_plan/coercion_rule/mod.rs renamed to datafusion-physical-expr/src/coercion_rule/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,4 @@
2020
//! Aggregate function rule
2121
//! Binary operation rule
2222
23-
pub(crate) mod aggregate_rule;
24-
pub(crate) mod binary_rule;
23+
pub mod binary_rule;

datafusion-physical-expr/src/expressions/approx_distinct.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,7 @@
1818
//! Defines physical expressions that can evaluated at runtime during query execution
1919
2020
use super::format_state_name;
21-
use crate::error::{DataFusionError, Result};
22-
use crate::physical_plan::{
23-
hyperloglog::HyperLogLog, Accumulator, AggregateExpr, PhysicalExpr,
24-
};
25-
use crate::scalar::ScalarValue;
21+
use crate::{hyperloglog::HyperLogLog, AggregateExpr, PhysicalExpr};
2622
use arrow::array::{
2723
ArrayRef, BinaryArray, BinaryOffsetSizeTrait, GenericBinaryArray, GenericStringArray,
2824
PrimitiveArray, StringOffsetSizeTrait,
@@ -31,6 +27,9 @@ use arrow::datatypes::{
3127
ArrowPrimitiveType, DataType, Field, Int16Type, Int32Type, Int64Type, Int8Type,
3228
UInt16Type, UInt32Type, UInt64Type, UInt8Type,
3329
};
30+
use datafusion_common::ScalarValue;
31+
use datafusion_common::{DataFusionError, Result};
32+
use datafusion_expr::Accumulator;
3433
use std::any::type_name;
3534
use std::any::Any;
3635
use std::convert::TryFrom;

datafusion-physical-expr/src/expressions/approx_median.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717

1818
//! Defines physical expressions for APPROX_MEDIAN that can be evaluated MEDIAN at runtime during query execution
1919
20+
use crate::{AggregateExpr, PhysicalExpr};
21+
use arrow::{datatypes::DataType, datatypes::Field};
22+
use datafusion_common::Result;
23+
use datafusion_expr::Accumulator;
2024
use std::any::Any;
2125
use std::sync::Arc;
2226

23-
use crate::error::Result;
24-
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
25-
use arrow::{datatypes::DataType, datatypes::Field};
26-
2727
/// MEDIAN aggregate expression
2828
#[derive(Debug)]
2929
pub struct ApproxMedian {

datafusion-physical-expr/src/expressions/approx_percentile_cont.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,20 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{any::Any, iter, sync::Arc};
19-
18+
use super::{format_state_name, Literal};
19+
use crate::{tdigest::TDigest, AggregateExpr, PhysicalExpr};
2020
use arrow::{
2121
array::{
2222
ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
2323
Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
2424
},
2525
datatypes::{DataType, Field},
2626
};
27-
28-
use crate::{
29-
error::DataFusionError,
30-
physical_plan::{tdigest::TDigest, Accumulator, AggregateExpr, PhysicalExpr},
31-
scalar::ScalarValue,
32-
};
33-
34-
use crate::error::Result;
35-
36-
use super::{format_state_name, Literal};
27+
use datafusion_common::DataFusionError;
28+
use datafusion_common::Result;
29+
use datafusion_common::ScalarValue;
30+
use datafusion_expr::Accumulator;
31+
use std::{any::Any, iter, sync::Arc};
3732

3833
/// Return `true` if `arg_type` is of a [`DataType`] that the
3934
/// [`ApproxPercentileCont`] aggregation can operate on.

datafusion-physical-expr/src/expressions/array_agg.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
//! Defines physical expressions that can evaluated at runtime during query execution
1919
2020
use super::format_state_name;
21-
use crate::error::{DataFusionError, Result};
22-
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
23-
use crate::scalar::ScalarValue;
21+
use crate::{AggregateExpr, PhysicalExpr};
2422
use arrow::array::ArrayRef;
2523
use arrow::datatypes::{DataType, Field};
24+
use datafusion_common::ScalarValue;
25+
use datafusion_common::{DataFusionError, Result};
26+
use datafusion_expr::Accumulator;
2627
use std::any::Any;
2728
use std::sync::Arc;
2829

@@ -157,18 +158,18 @@ impl Accumulator for ArrayAggAccumulator {
157158
#[cfg(test)]
158159
mod tests {
159160
use super::*;
160-
use crate::from_slice::FromSlice;
161-
use crate::physical_plan::expressions::col;
162-
use crate::physical_plan::expressions::tests::aggregate;
163-
use crate::{error::Result, generic_test_op};
161+
use crate::expressions::col;
162+
use crate::expressions::tests::aggregate;
163+
use crate::generic_test_op;
164164
use arrow::array::ArrayRef;
165165
use arrow::array::Int32Array;
166166
use arrow::datatypes::*;
167167
use arrow::record_batch::RecordBatch;
168+
use datafusion_common::Result;
168169

169170
#[test]
170171
fn array_agg_i32() -> Result<()> {
171-
let a: ArrayRef = Arc::new(Int32Array::from_slice(&[1, 2, 3, 4, 5]));
172+
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
172173

173174
let list = ScalarValue::List(
174175
Some(Box::new(vec![

datafusion-physical-expr/src/expressions/average.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,18 @@ use std::any::Any;
2121
use std::convert::TryFrom;
2222
use std::sync::Arc;
2323

24-
use crate::error::{DataFusionError, Result};
25-
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
26-
use crate::scalar::{
27-
ScalarValue, MAX_PRECISION_FOR_DECIMAL128, MAX_SCALE_FOR_DECIMAL128,
28-
};
24+
use crate::{AggregateExpr, PhysicalExpr};
2925
use arrow::compute;
3026
use arrow::datatypes::DataType;
3127
use arrow::{
3228
array::{ArrayRef, UInt64Array},
3329
datatypes::Field,
3430
};
31+
use datafusion_common::{DataFusionError, Result};
32+
use datafusion_common::{
33+
ScalarValue, MAX_PRECISION_FOR_DECIMAL128, MAX_SCALE_FOR_DECIMAL128,
34+
};
35+
use datafusion_expr::Accumulator;
3536

3637
use super::{format_state_name, sum};
3738

@@ -70,7 +71,7 @@ pub fn avg_return_type(arg_type: &DataType) -> Result<DataType> {
7071
}
7172
}
7273

73-
pub(crate) fn is_avg_support_arg_type(arg_type: &DataType) -> bool {
74+
pub fn is_avg_support_arg_type(arg_type: &DataType) -> bool {
7475
matches!(
7576
arg_type,
7677
DataType::UInt8
@@ -215,11 +216,11 @@ impl Accumulator for AvgAccumulator {
215216
#[cfg(test)]
216217
mod tests {
217218
use super::*;
218-
use crate::from_slice::FromSlice;
219-
use crate::physical_plan::expressions::col;
220-
use crate::{error::Result, generic_test_op};
219+
use crate::expressions::col;
220+
use crate::generic_test_op;
221221
use arrow::record_batch::RecordBatch;
222222
use arrow::{array::*, datatypes::*};
223+
use datafusion_common::Result;
223224

224225
#[test]
225226
fn test_avg_return_data_type() -> Result<()> {
@@ -290,7 +291,7 @@ mod tests {
290291

291292
#[test]
292293
fn avg_i32() -> Result<()> {
293-
let a: ArrayRef = Arc::new(Int32Array::from_slice(&[1, 2, 3, 4, 5]));
294+
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
294295
generic_test_op!(
295296
a,
296297
DataType::Int32,
@@ -332,9 +333,8 @@ mod tests {
332333

333334
#[test]
334335
fn avg_u32() -> Result<()> {
335-
let a: ArrayRef = Arc::new(UInt32Array::from_slice(&[
336-
1_u32, 2_u32, 3_u32, 4_u32, 5_u32,
337-
]));
336+
let a: ArrayRef =
337+
Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 5_u32]));
338338
generic_test_op!(
339339
a,
340340
DataType::UInt32,
@@ -346,9 +346,8 @@ mod tests {
346346

347347
#[test]
348348
fn avg_f32() -> Result<()> {
349-
let a: ArrayRef = Arc::new(Float32Array::from_slice(&[
350-
1_f32, 2_f32, 3_f32, 4_f32, 5_f32,
351-
]));
349+
let a: ArrayRef =
350+
Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 5_f32]));
352351
generic_test_op!(
353352
a,
354353
DataType::Float32,
@@ -360,9 +359,8 @@ mod tests {
360359

361360
#[test]
362361
fn avg_f64() -> Result<()> {
363-
let a: ArrayRef = Arc::new(Float64Array::from_slice(&[
364-
1_f64, 2_f64, 3_f64, 4_f64, 5_f64,
365-
]));
362+
let a: ArrayRef =
363+
Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 5_f64]));
366364
generic_test_op!(
367365
a,
368366
DataType::Float64,

datafusion-physical-expr/src/expressions/binary.rs

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,12 @@ use arrow::datatypes::{ArrowNumericType, DataType, Schema, TimeUnit};
5454
use arrow::error::ArrowError::DivideByZero;
5555
use arrow::record_batch::RecordBatch;
5656

57-
use crate::error::{DataFusionError, Result};
58-
use crate::physical_plan::coercion_rule::binary_rule::coerce_types;
59-
use crate::physical_plan::expressions::try_cast;
60-
use crate::physical_plan::{ColumnarValue, PhysicalExpr};
61-
use crate::scalar::ScalarValue;
57+
use crate::coercion_rule::binary_rule::coerce_types;
58+
use crate::expressions::try_cast;
59+
use crate::PhysicalExpr;
60+
use datafusion_common::ScalarValue;
61+
use datafusion_common::{DataFusionError, Result};
62+
use datafusion_expr::ColumnarValue;
6263
use datafusion_expr::Operator;
6364

6465
// TODO move to arrow_rs
@@ -1375,11 +1376,10 @@ pub fn binary(
13751376
#[cfg(test)]
13761377
mod tests {
13771378
use super::*;
1378-
use crate::error::Result;
1379-
use crate::from_slice::FromSlice;
1380-
use crate::physical_plan::expressions::{col, lit};
1379+
use crate::expressions::{col, lit};
13811380
use arrow::datatypes::{ArrowNumericType, Field, Int32Type, SchemaRef};
13821381
use arrow::util::display::array_value_to_string;
1382+
use datafusion_common::Result;
13831383

13841384
// Create a binary expression without coercion. Used here when we do not want to coerce the expressions
13851385
// to valid types. Usage can result in an execution (after plan) error.
@@ -1398,8 +1398,8 @@ mod tests {
13981398
Field::new("a", DataType::Int32, false),
13991399
Field::new("b", DataType::Int32, false),
14001400
]);
1401-
let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]);
1402-
let b = Int32Array::from_slice(&[1, 2, 4, 8, 16]);
1401+
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1402+
let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
14031403

14041404
// expression: "a < b"
14051405
let lt = binary_simple(
@@ -1432,8 +1432,8 @@ mod tests {
14321432
Field::new("a", DataType::Int32, false),
14331433
Field::new("b", DataType::Int32, false),
14341434
]);
1435-
let a = Int32Array::from_slice(&[2, 4, 6, 8, 10]);
1436-
let b = Int32Array::from_slice(&[2, 5, 4, 8, 8]);
1435+
let a = Int32Array::from(vec![2, 4, 6, 8, 10]);
1436+
let b = Int32Array::from(vec![2, 5, 4, 8, 8]);
14371437

14381438
// expression: "a < b OR a == b"
14391439
let expr = binary_simple(
@@ -1831,14 +1831,14 @@ mod tests {
18311831
Field::new("a", DataType::Int32, false),
18321832
Field::new("b", DataType::Int32, false),
18331833
]);
1834-
let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]);
1835-
let b = Int32Array::from_slice(&[1, 2, 4, 8, 16]);
1834+
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1835+
let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
18361836

18371837
apply_arithmetic::<Int32Type>(
18381838
Arc::new(schema),
18391839
vec![Arc::new(a), Arc::new(b)],
18401840
Operator::Plus,
1841-
Int32Array::from_slice(&[2, 4, 7, 12, 21]),
1841+
Int32Array::from(vec![2, 4, 7, 12, 21]),
18421842
)?;
18431843

18441844
Ok(())
@@ -1850,22 +1850,22 @@ mod tests {
18501850
Field::new("a", DataType::Int32, false),
18511851
Field::new("b", DataType::Int32, false),
18521852
]));
1853-
let a = Arc::new(Int32Array::from_slice(&[1, 2, 4, 8, 16]));
1854-
let b = Arc::new(Int32Array::from_slice(&[1, 2, 3, 4, 5]));
1853+
let a = Arc::new(Int32Array::from(vec![1, 2, 4, 8, 16]));
1854+
let b = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
18551855

18561856
apply_arithmetic::<Int32Type>(
18571857
schema.clone(),
18581858
vec![a.clone(), b.clone()],
18591859
Operator::Minus,
1860-
Int32Array::from_slice(&[0, 0, 1, 4, 11]),
1860+
Int32Array::from(vec![0, 0, 1, 4, 11]),
18611861
)?;
18621862

18631863
// should handle have negative values in result (for signed)
18641864
apply_arithmetic::<Int32Type>(
18651865
schema,
18661866
vec![b, a],
18671867
Operator::Minus,
1868-
Int32Array::from_slice(&[0, 0, -1, -4, -11]),
1868+
Int32Array::from(vec![0, 0, -1, -4, -11]),
18691869
)?;
18701870

18711871
Ok(())
@@ -1877,14 +1877,14 @@ mod tests {
18771877
Field::new("a", DataType::Int32, false),
18781878
Field::new("b", DataType::Int32, false),
18791879
]));
1880-
let a = Arc::new(Int32Array::from_slice(&[4, 8, 16, 32, 64]));
1881-
let b = Arc::new(Int32Array::from_slice(&[2, 4, 8, 16, 32]));
1880+
let a = Arc::new(Int32Array::from(vec![4, 8, 16, 32, 64]));
1881+
let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));
18821882

18831883
apply_arithmetic::<Int32Type>(
18841884
schema,
18851885
vec![a, b],
18861886
Operator::Multiply,
1887-
Int32Array::from_slice(&[8, 32, 128, 512, 2048]),
1887+
Int32Array::from(vec![8, 32, 128, 512, 2048]),
18881888
)?;
18891889

18901890
Ok(())
@@ -1896,14 +1896,14 @@ mod tests {
18961896
Field::new("a", DataType::Int32, false),
18971897
Field::new("b", DataType::Int32, false),
18981898
]));
1899-
let a = Arc::new(Int32Array::from_slice(&[8, 32, 128, 512, 2048]));
1900-
let b = Arc::new(Int32Array::from_slice(&[2, 4, 8, 16, 32]));
1899+
let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
1900+
let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));
19011901

19021902
apply_arithmetic::<Int32Type>(
19031903
schema,
19041904
vec![a, b],
19051905
Operator::Divide,
1906-
Int32Array::from_slice(&[4, 8, 16, 32, 64]),
1906+
Int32Array::from(vec![4, 8, 16, 32, 64]),
19071907
)?;
19081908

19091909
Ok(())
@@ -1915,14 +1915,14 @@ mod tests {
19151915
Field::new("a", DataType::Int32, false),
19161916
Field::new("b", DataType::Int32, false),
19171917
]));
1918-
let a = Arc::new(Int32Array::from_slice(&[8, 32, 128, 512, 2048]));
1919-
let b = Arc::new(Int32Array::from_slice(&[2, 4, 7, 14, 32]));
1918+
let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
1919+
let b = Arc::new(Int32Array::from(vec![2, 4, 7, 14, 32]));
19201920

19211921
apply_arithmetic::<Int32Type>(
19221922
schema,
19231923
vec![a, b],
19241924
Operator::Modulo,
1925-
Int32Array::from_slice(&[0, 0, 2, 8, 0]),
1925+
Int32Array::from(vec![0, 0, 2, 8, 0]),
19261926
)?;
19271927

19281928
Ok(())

0 commit comments

Comments
 (0)