Skip to content

Commit fd596ed

Browse files
authored
build: Switch back to official DataFusion repo and arrow-rs after Arrow Java 16 is released (apache#403)
* build: Switch back to released version of DataFusion and arrow-rs * Exclude all arrow dependencies from Spark * Revert "build: Switch back to released version of DataFusion and arrow-rs" This reverts commit 29c89bfb25ddf4757ab17f951d3ccf17e55422da. * Test * Test * Test arrow-rs fix * Fix * Use DataFusion repo * Fix * Fix * Use 39.0.0-rc1
1 parent c6d387c commit fd596ed

22 files changed

+301
-217
lines changed

core/Cargo.lock

Lines changed: 159 additions & 81 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,14 @@ include = [
2929

3030
[dependencies]
3131
parquet-format = "4.0.0" # This must be kept in sync with that from parquet crate
32-
arrow = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c", features = ["prettyprint", "ffi", "chrono-tz"] }
33-
arrow-array = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" }
34-
arrow-data = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" }
35-
arrow-schema = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" }
36-
arrow-string = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" }
37-
parquet = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c", default-features = false, features = ["experimental"] }
38-
half = { version = "~2.1", default-features = false }
32+
arrow = { version = "52.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }
33+
arrow-array = { version = "52.0.0" }
34+
arrow-buffer = { version = "52.0.0" }
35+
arrow-data = { version = "52.0.0" }
36+
arrow-schema = { version = "52.0.0" }
37+
arrow-string = { version = "52.0.0" }
38+
parquet = { version = "52.0.0", default-features = false, features = ["experimental"] }
39+
half = { version = "2.4.1", default-features = false }
3940
futures = "0.3.28"
4041
mimalloc = { version = "*", default-features = false, optional = true }
4142
tokio = { version = "1", features = ["rt-multi-thread"] }
@@ -66,10 +67,12 @@ itertools = "0.11.0"
6667
chrono = { version = "0.4", default-features = false, features = ["clock"] }
6768
chrono-tz = { version = "0.8" }
6869
paste = "1.0.14"
69-
datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4" }
70-
datafusion = { default-features = false, git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", features = ["unicode_expressions", "crypto_expressions"] }
71-
datafusion-functions = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", features = ["crypto_expressions"]}
72-
datafusion-physical-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", default-features = false, features = ["unicode_expressions"] }
70+
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1" }
71+
datafusion = { default-features = false, git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", features = ["unicode_expressions", "crypto_expressions"] }
72+
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", features = ["crypto_expressions"] }
73+
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false }
74+
datafusion-physical-expr-common = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false }
75+
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false }
7376
unicode-segmentation = "^1.10.1"
7477
once_cell = "1.18.0"
7578
regex = "1.9.6"

core/src/execution/datafusion/expressions/bitwise_not.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ impl PhysicalExpr for BitwiseNotExpr {
105105
}
106106
}
107107

108-
fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
109-
vec![self.arg.clone()]
108+
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
109+
vec![&self.arg]
110110
}
111111

112112
fn with_new_children(

core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ impl PhysicalExpr for BloomFilterMightContain {
129129
})
130130
}
131131

132-
fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
133-
vec![self.bloom_filter_expr.clone(), self.value_expr.clone()]
132+
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
133+
vec![&self.bloom_filter_expr, &self.value_expr]
134134
}
135135

136136
fn with_new_children(

core/src/execution/datafusion/expressions/cast.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1292,8 +1292,8 @@ impl PhysicalExpr for Cast {
12921292
}
12931293
}
12941294

1295-
fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
1296-
vec![self.child.clone()]
1295+
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
1296+
vec![&self.child]
12971297
}
12981298

12991299
fn with_new_children(

core/src/execution/datafusion/expressions/checkoverflow.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ impl PhysicalExpr for CheckOverflow {
165165
}
166166
}
167167

168-
fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
169-
vec![self.child.clone()]
168+
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
169+
vec![&self.child]
170170
}
171171

172172
fn with_new_children(

core/src/execution/datafusion/expressions/if_expr.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,8 @@ impl PhysicalExpr for IfExpr {
110110
Ok(ColumnarValue::Array(current_value))
111111
}
112112

113-
fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
114-
vec![
115-
self.if_expr.clone(),
116-
self.true_expr.clone(),
117-
self.false_expr.clone(),
118-
]
113+
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
114+
vec![&self.if_expr, &self.true_expr, &self.false_expr]
119115
}
120116

121117
fn with_new_children(
@@ -225,8 +221,8 @@ mod tests {
225221
let true_expr = lit(123i32);
226222
let false_expr = lit(999i32);
227223

228-
let expr = if_fn(if_expr, true_expr, false_expr);
229-
let children = expr.unwrap().children();
224+
let expr = if_fn(if_expr, true_expr, false_expr).unwrap();
225+
let children = expr.children();
230226
assert_eq!(children.len(), 3);
231227
assert_eq!(children[0].to_string(), "true");
232228
assert_eq!(children[1].to_string(), "123");

core/src/execution/datafusion/expressions/negative.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@
1818
use crate::errors::CometError;
1919
use arrow::{compute::kernels::numeric::neg_wrapping, datatypes::IntervalDayTimeType};
2020
use arrow_array::RecordBatch;
21+
use arrow_buffer::IntervalDayTime;
2122
use arrow_schema::{DataType, Schema};
2223
use datafusion::{
2324
logical_expr::{interval_arithmetic::Interval, ColumnarValue},
2425
physical_expr::PhysicalExpr,
2526
};
2627
use datafusion_common::{Result, ScalarValue};
27-
use datafusion_physical_expr::{
28-
aggregate::utils::down_cast_any_ref, sort_properties::SortProperties,
29-
};
28+
use datafusion_expr::sort_properties::ExprProperties;
29+
use datafusion_physical_expr::aggregate::utils::down_cast_any_ref;
3030
use std::{
3131
any::Any,
3232
hash::{Hash, Hasher},
@@ -63,7 +63,7 @@ macro_rules! check_overflow {
6363
for i in 0..typed_array.len() {
6464
if typed_array.value(i) == $min_val {
6565
if $type_name == "byte" || $type_name == "short" {
66-
let value = typed_array.value(i).to_string() + " caused";
66+
let value = format!("{:?} caused", typed_array.value(i));
6767
return Err(arithmetic_overflow_error(value.as_str()).into());
6868
}
6969
return Err(arithmetic_overflow_error($type_name).into());
@@ -135,7 +135,7 @@ impl PhysicalExpr for NegativeExpr {
135135
arrow::datatypes::IntervalUnit::DayTime => check_overflow!(
136136
array,
137137
arrow::array::IntervalDayTimeArray,
138-
i64::MIN,
138+
IntervalDayTime::MIN,
139139
"interval"
140140
),
141141
arrow::datatypes::IntervalUnit::MonthDayNano => {
@@ -195,8 +195,8 @@ impl PhysicalExpr for NegativeExpr {
195195
}
196196
}
197197

198-
fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
199-
vec![self.arg.clone()]
198+
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
199+
vec![&self.arg]
200200
}
201201

202202
fn with_new_children(
@@ -255,8 +255,9 @@ impl PhysicalExpr for NegativeExpr {
255255
}
256256

257257
/// The ordering of a [`NegativeExpr`] is simply the reverse of its child.
258-
fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
259-
-children[0]
258+
fn get_properties(&self, children: &[ExprProperties]) -> Result<ExprProperties> {
259+
let properties = children[0].clone().with_order(children[0].sort_properties);
260+
Ok(properties)
260261
}
261262
}
262263

core/src/execution/datafusion/expressions/normalize_nan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl PhysicalExpr for NormalizeNaNAndZero {
7777
}
7878
}
7979

80-
fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
80+
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
8181
self.child.children()
8282
}
8383

core/src/execution/datafusion/expressions/scalar_funcs.rs

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use std::{
1919
any::Any,
2020
cmp::min,
2121
fmt::{Debug, Write},
22-
str::FromStr,
2322
sync::Arc,
2423
};
2524

@@ -35,17 +34,15 @@ use arrow_array::{Array, ArrowNativeTypeOp, Decimal128Array, StringArray};
3534
use arrow_schema::DataType;
3635
use datafusion::{
3736
execution::FunctionRegistry,
38-
logical_expr::{
39-
BuiltinScalarFunction, ScalarFunctionDefinition, ScalarFunctionImplementation,
40-
ScalarUDFImpl, Signature, Volatility,
41-
},
37+
functions::math::round::round,
38+
logical_expr::{ScalarFunctionImplementation, ScalarUDFImpl, Signature, Volatility},
4239
physical_plan::ColumnarValue,
4340
};
4441
use datafusion_common::{
4542
cast::{as_binary_array, as_generic_string_array},
4643
exec_err, internal_err, DataFusionError, Result as DataFusionResult, ScalarValue,
4744
};
48-
use datafusion_physical_expr::{math_expressions, udf::ScalarUDF};
45+
use datafusion_expr::ScalarUDF;
4946
use num::{
5047
integer::{div_ceil, div_floor},
5148
BigInt, Signed, ToPrimitive,
@@ -66,9 +63,7 @@ macro_rules! make_comet_scalar_udf {
6663
$data_type.clone(),
6764
Arc::new(move |args| $func(args, &$data_type)),
6865
);
69-
Ok(ScalarFunctionDefinition::UDF(Arc::new(
70-
ScalarUDF::new_from_impl(scalar_func),
71-
)))
66+
Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func)))
7267
}};
7368
($name:expr, $func:expr, without $data_type:ident) => {{
7469
let scalar_func = CometScalarFunction::new(
@@ -77,9 +72,7 @@ macro_rules! make_comet_scalar_udf {
7772
$data_type,
7873
$func,
7974
);
80-
Ok(ScalarFunctionDefinition::UDF(Arc::new(
81-
ScalarUDF::new_from_impl(scalar_func),
82-
)))
75+
Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func)))
8376
}};
8477
}
8578

@@ -88,7 +81,7 @@ pub fn create_comet_physical_fun(
8881
fun_name: &str,
8982
data_type: DataType,
9083
registry: &dyn FunctionRegistry,
91-
) -> Result<ScalarFunctionDefinition, DataFusionError> {
84+
) -> Result<Arc<ScalarUDF>, DataFusionError> {
9285
let sha2_functions = ["sha224", "sha256", "sha384", "sha512"];
9386
match fun_name {
9487
"ceil" => {
@@ -140,13 +133,11 @@ pub fn create_comet_physical_fun(
140133
let spark_func_name = "spark".to_owned() + sha;
141134
make_comet_scalar_udf!(spark_func_name, wrapped_func, without data_type)
142135
}
143-
_ => {
144-
if let Ok(fun) = BuiltinScalarFunction::from_str(fun_name) {
145-
Ok(ScalarFunctionDefinition::BuiltIn(fun))
146-
} else {
147-
Ok(ScalarFunctionDefinition::UDF(registry.udf(fun_name)?))
148-
}
149-
}
136+
_ => registry.udf(fun_name).map_err(|e| {
137+
DataFusionError::Execution(format!(
138+
"Function {fun_name} not found in the registry: {e}",
139+
))
140+
}),
150141
}
151142
}
152143

@@ -509,9 +500,7 @@ fn spark_round(
509500
make_decimal_array(array, precision, scale, &f)
510501
}
511502
DataType::Float32 | DataType::Float64 => {
512-
Ok(ColumnarValue::Array(math_expressions::round(&[
513-
array.clone()
514-
])?))
503+
Ok(ColumnarValue::Array(round(&[array.clone()])?))
515504
}
516505
dt => exec_err!("Not supported datatype for ROUND: {dt}"),
517506
},
@@ -534,7 +523,7 @@ fn spark_round(
534523
make_decimal_scalar(a, precision, scale, &f)
535524
}
536525
ScalarValue::Float32(_) | ScalarValue::Float64(_) => Ok(ColumnarValue::Scalar(
537-
ScalarValue::try_from_array(&math_expressions::round(&[a.to_array()?])?, 0)?,
526+
ScalarValue::try_from_array(&round(&[a.to_array()?])?, 0)?,
538527
)),
539528
dt => exec_err!("Not supported datatype for ROUND: {dt}"),
540529
},

0 commit comments

Comments
 (0)