Skip to content

Commit e4d6261

Browse files
committed
move more physical expr
1 parent f6bbb62 commit e4d6261

28 files changed

+1288
-1230
lines changed

datafusion-physical-expr/Cargo.toml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ name = "datafusion_physical_expr"
3333
path = "src/lib.rs"
3434

3535
[features]
36+
default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
37+
crypto_expressions = ["md-5", "sha2", "blake2", "blake3"]
38+
regex_expressions = ["regex"]
39+
unicode_expressions = ["unicode-segmentation"]
3640

3741
[dependencies]
3842
datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
@@ -41,3 +45,13 @@ arrow = { version = "9.0.0", features = ["prettyprint"] }
4145
paste = "^1.0"
4246
ahash = { version = "0.7", default-features = false }
4347
ordered-float = "2.10"
48+
lazy_static = { version = "^1.4.0" }
49+
md-5 = { version = "^0.10.0", optional = true }
50+
sha2 = { version = "^0.10.1", optional = true }
51+
blake2 = { version = "^0.10.2", optional = true }
52+
blake3 = { version = "1.0", optional = true }
53+
rand = "0.8"
54+
hashbrown = { version = "0.12", features = ["raw"] }
55+
chrono = { version = "0.4", default-features = false }
56+
regex = { version = "^1.4.3", optional = true }
57+
unicode-segmentation = { version = "^1.7.1", optional = true }

datafusion-physical-expr/src/aggregate_expr.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@
1616
// under the License.
1717

1818
use crate::PhysicalExpr;
19-
2019
use arrow::datatypes::Field;
2120
use datafusion_common::Result;
2221
use datafusion_expr::Accumulator;
23-
use std::fmt::Debug;
24-
2522
use std::any::Any;
23+
use std::fmt::Debug;
2624
use std::sync::Arc;
2725

2826
/// An aggregate expression that:

datafusion/src/physical_plan/array_expressions.rs renamed to datafusion-physical-expr/src/array_expressions.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717

1818
//! Array expressions
1919
20-
use crate::error::{DataFusionError, Result};
2120
use arrow::array::*;
2221
use arrow::datatypes::DataType;
22+
use datafusion_common::{DataFusionError, Result};
23+
use datafusion_expr::ColumnarValue;
2324
use std::sync::Arc;
2425

25-
use super::ColumnarValue;
26-
2726
macro_rules! downcast_vec {
2827
($ARGS:expr, $ARRAY_TYPE:ident) => {{
2928
$ARGS
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Support the coercion rule for aggregate function.
19+
20+
use crate::expressions::{
21+
is_approx_percentile_cont_supported_arg_type, is_avg_support_arg_type,
22+
is_correlation_support_arg_type, is_covariance_support_arg_type,
23+
is_stddev_support_arg_type, is_sum_support_arg_type, is_variance_support_arg_type,
24+
try_cast,
25+
};
26+
use crate::PhysicalExpr;
27+
use arrow::datatypes::DataType;
28+
use arrow::datatypes::Schema;
29+
use datafusion_common::{DataFusionError, Result};
30+
use datafusion_expr::AggregateFunction;
31+
use datafusion_expr::{Signature, TypeSignature};
32+
use std::ops::Deref;
33+
use std::sync::Arc;
34+
35+
/// Returns the coerced data type for each `input_types`.
36+
/// Different aggregate function with different input data type will get corresponding coerced data type.
37+
pub fn coerce_types(
38+
agg_fun: &AggregateFunction,
39+
input_types: &[DataType],
40+
signature: &Signature,
41+
) -> Result<Vec<DataType>> {
42+
// Validate input_types matches (at least one of) the func signature.
43+
check_arg_count(agg_fun, input_types, &signature.type_signature)?;
44+
45+
match agg_fun {
46+
AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
47+
Ok(input_types.to_vec())
48+
}
49+
AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
50+
AggregateFunction::Min | AggregateFunction::Max => {
51+
// min and max support the dictionary data type
52+
// unpack the dictionary to get the value
53+
get_min_max_result_type(input_types)
54+
}
55+
AggregateFunction::Sum => {
56+
// Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
57+
// smallint, int, bigint, real, double precision, decimal, or interval.
58+
if !is_sum_support_arg_type(&input_types[0]) {
59+
return Err(DataFusionError::Plan(format!(
60+
"The function {:?} does not support inputs of type {:?}.",
61+
agg_fun, input_types[0]
62+
)));
63+
}
64+
Ok(input_types.to_vec())
65+
}
66+
AggregateFunction::Avg => {
67+
// Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
68+
// smallint, int, bigint, real, double precision, decimal, or interval
69+
if !is_avg_support_arg_type(&input_types[0]) {
70+
return Err(DataFusionError::Plan(format!(
71+
"The function {:?} does not support inputs of type {:?}.",
72+
agg_fun, input_types[0]
73+
)));
74+
}
75+
Ok(input_types.to_vec())
76+
}
77+
AggregateFunction::Variance => {
78+
if !is_variance_support_arg_type(&input_types[0]) {
79+
return Err(DataFusionError::Plan(format!(
80+
"The function {:?} does not support inputs of type {:?}.",
81+
agg_fun, input_types[0]
82+
)));
83+
}
84+
Ok(input_types.to_vec())
85+
}
86+
AggregateFunction::VariancePop => {
87+
if !is_variance_support_arg_type(&input_types[0]) {
88+
return Err(DataFusionError::Plan(format!(
89+
"The function {:?} does not support inputs of type {:?}.",
90+
agg_fun, input_types[0]
91+
)));
92+
}
93+
Ok(input_types.to_vec())
94+
}
95+
AggregateFunction::Covariance => {
96+
if !is_covariance_support_arg_type(&input_types[0]) {
97+
return Err(DataFusionError::Plan(format!(
98+
"The function {:?} does not support inputs of type {:?}.",
99+
agg_fun, input_types[0]
100+
)));
101+
}
102+
Ok(input_types.to_vec())
103+
}
104+
AggregateFunction::CovariancePop => {
105+
if !is_covariance_support_arg_type(&input_types[0]) {
106+
return Err(DataFusionError::Plan(format!(
107+
"The function {:?} does not support inputs of type {:?}.",
108+
agg_fun, input_types[0]
109+
)));
110+
}
111+
Ok(input_types.to_vec())
112+
}
113+
AggregateFunction::Stddev => {
114+
if !is_stddev_support_arg_type(&input_types[0]) {
115+
return Err(DataFusionError::Plan(format!(
116+
"The function {:?} does not support inputs of type {:?}.",
117+
agg_fun, input_types[0]
118+
)));
119+
}
120+
Ok(input_types.to_vec())
121+
}
122+
AggregateFunction::StddevPop => {
123+
if !is_stddev_support_arg_type(&input_types[0]) {
124+
return Err(DataFusionError::Plan(format!(
125+
"The function {:?} does not support inputs of type {:?}.",
126+
agg_fun, input_types[0]
127+
)));
128+
}
129+
Ok(input_types.to_vec())
130+
}
131+
AggregateFunction::Correlation => {
132+
if !is_correlation_support_arg_type(&input_types[0]) {
133+
return Err(DataFusionError::Plan(format!(
134+
"The function {:?} does not support inputs of type {:?}.",
135+
agg_fun, input_types[0]
136+
)));
137+
}
138+
Ok(input_types.to_vec())
139+
}
140+
AggregateFunction::ApproxPercentileCont => {
141+
if !is_approx_percentile_cont_supported_arg_type(&input_types[0]) {
142+
return Err(DataFusionError::Plan(format!(
143+
"The function {:?} does not support inputs of type {:?}.",
144+
agg_fun, input_types[0]
145+
)));
146+
}
147+
if !matches!(input_types[1], DataType::Float64) {
148+
return Err(DataFusionError::Plan(format!(
149+
"The percentile argument for {:?} must be Float64, not {:?}.",
150+
agg_fun, input_types[1]
151+
)));
152+
}
153+
Ok(input_types.to_vec())
154+
}
155+
AggregateFunction::ApproxMedian => {
156+
if !is_approx_percentile_cont_supported_arg_type(&input_types[0]) {
157+
return Err(DataFusionError::Plan(format!(
158+
"The function {:?} does not support inputs of type {:?}.",
159+
agg_fun, input_types[0]
160+
)));
161+
}
162+
Ok(input_types.to_vec())
163+
}
164+
}
165+
}
166+
167+
/// Validate the length of `input_types` matches the `signature` for `agg_fun`.
168+
///
169+
/// This method DOES NOT validate the argument types - only that (at least one,
170+
/// in the case of [`TypeSignature::OneOf`]) signature matches the desired
171+
/// number of input types.
172+
fn check_arg_count(
173+
agg_fun: &AggregateFunction,
174+
input_types: &[DataType],
175+
signature: &TypeSignature,
176+
) -> Result<()> {
177+
match signature {
178+
TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
179+
if input_types.len() != *agg_count {
180+
return Err(DataFusionError::Plan(format!(
181+
"The function {:?} expects {:?} arguments, but {:?} were provided",
182+
agg_fun,
183+
agg_count,
184+
input_types.len()
185+
)));
186+
}
187+
}
188+
TypeSignature::Exact(types) => {
189+
if types.len() != input_types.len() {
190+
return Err(DataFusionError::Plan(format!(
191+
"The function {:?} expects {:?} arguments, but {:?} were provided",
192+
agg_fun,
193+
types.len(),
194+
input_types.len()
195+
)));
196+
}
197+
}
198+
TypeSignature::OneOf(variants) => {
199+
let ok = variants
200+
.iter()
201+
.any(|v| check_arg_count(agg_fun, input_types, v).is_ok());
202+
if !ok {
203+
return Err(DataFusionError::Plan(format!(
204+
"The function {:?} does not accept {:?} function arguments.",
205+
agg_fun,
206+
input_types.len()
207+
)));
208+
}
209+
}
210+
_ => {
211+
return Err(DataFusionError::Internal(format!(
212+
"Aggregate functions do not support this {:?}",
213+
signature
214+
)));
215+
}
216+
}
217+
Ok(())
218+
}
219+
220+
fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
221+
// make sure that the input types only has one element.
222+
assert_eq!(input_types.len(), 1);
223+
// min and max support the dictionary data type
224+
// unpack the dictionary to get the value
225+
match &input_types[0] {
226+
DataType::Dictionary(_, dict_value_type) => {
227+
// TODO add checker, if the value type is complex data type
228+
Ok(vec![dict_value_type.deref().clone()])
229+
}
230+
// TODO add checker for datatype which min and max supported
231+
// For example, the `Struct` and `Map` type are not supported in the MIN and MAX function
232+
_ => Ok(input_types.to_vec()),
233+
}
234+
}
235+
236+
/// Returns the coerced exprs for each `input_exprs`.
237+
/// Get the coerced data type from `aggregate_rule::coerce_types` and add `try_cast` if the
238+
/// data type of `input_exprs` need to be coerced.
239+
pub fn coerce_exprs(
240+
agg_fun: &AggregateFunction,
241+
input_exprs: &[Arc<dyn PhysicalExpr>],
242+
schema: &Schema,
243+
signature: &Signature,
244+
) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
245+
if input_exprs.is_empty() {
246+
return Ok(vec![]);
247+
}
248+
let input_types = input_exprs
249+
.iter()
250+
.map(|e| e.data_type(schema))
251+
.collect::<Result<Vec<_>>>()?;
252+
253+
// get the coerced data types
254+
let coerced_types = coerce_types(agg_fun, &input_types, signature)?;
255+
256+
// try cast if need
257+
input_exprs
258+
.iter()
259+
.zip(coerced_types.into_iter())
260+
.map(|(expr, coerced_type)| try_cast(expr.clone(), schema, coerced_type))
261+
.collect::<Result<Vec<_>>>()
262+
}

datafusion-physical-expr/src/coercion_rule/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@
2020
//! Aggregate function rule
2121
//! Binary operation rule
2222
23+
pub mod aggregate_rule;
2324
pub mod binary_rule;

datafusion/src/physical_plan/crypto_expressions.rs renamed to datafusion-physical-expr/src/crypto_expressions.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,7 @@
1616
// under the License.
1717

1818
//! Crypto expressions
19-
use super::ColumnarValue;
20-
use crate::{
21-
error::{DataFusionError, Result},
22-
scalar::ScalarValue,
23-
};
19+
2420
use arrow::{
2521
array::{
2622
Array, ArrayRef, BinaryArray, GenericStringArray, StringArray,
@@ -30,6 +26,9 @@ use arrow::{
3026
};
3127
use blake2::{Blake2b512, Blake2s256, Digest};
3228
use blake3::Hasher as Blake3;
29+
use datafusion_common::ScalarValue;
30+
use datafusion_common::{DataFusionError, Result};
31+
use datafusion_expr::ColumnarValue;
3332
use md5::Md5;
3433
use sha2::{Sha224, Sha256, Sha384, Sha512};
3534
use std::any::type_name;

datafusion/src/physical_plan/datetime_expressions.rs renamed to datafusion-physical-expr/src/datetime_expressions.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,7 @@
1616
// under the License.
1717

1818
//! DateTime expressions
19-
use std::sync::Arc;
2019
21-
use super::ColumnarValue;
22-
use crate::{
23-
error::{DataFusionError, Result},
24-
scalar::{ScalarType, ScalarValue},
25-
};
2620
use arrow::{
2721
array::{Array, ArrayRef, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait},
2822
compute::kernels::cast_utils::string_to_timestamp_nanos,
@@ -42,7 +36,11 @@ use arrow::{
4236
};
4337
use chrono::prelude::*;
4438
use chrono::Duration;
39+
use datafusion_common::{DataFusionError, Result};
40+
use datafusion_common::{ScalarType, ScalarValue};
41+
use datafusion_expr::ColumnarValue;
4542
use std::borrow::Borrow;
43+
use std::sync::Arc;
4644

4745
/// given a function `op` that maps a `&str` to a Result of an arrow native type,
4846
/// returns a `PrimitiveArray` after the application

datafusion-physical-expr/src/expressions/cume_dist.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
//! Defines physical expression for `cume_dist` that can evaluated
1919
//! at runtime during query execution
2020
21+
use crate::window::partition_evaluator::PartitionEvaluator;
2122
use crate::window::BuiltInWindowFunctionExpr;
22-
use crate::window::PartitionEvaluator;
2323
use crate::PhysicalExpr;
2424
use arrow::array::ArrayRef;
2525
use arrow::array::Float64Array;

datafusion-physical-expr/src/expressions/lead_lag.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
//! Defines physical expression for `lead` and `lag` that can evaluated
1919
//! at runtime during query execution
2020
21+
use crate::window::partition_evaluator::PartitionEvaluator;
2122
use crate::window::BuiltInWindowFunctionExpr;
22-
use crate::window::PartitionEvaluator;
2323
use crate::PhysicalExpr;
2424
use arrow::array::ArrayRef;
2525
use arrow::compute::cast;

0 commit comments

Comments
 (0)