Skip to content

Commit ab1d30a

Browse files
authored
chore: Move temporal kernels and expressions to spark-expr crate (apache#660)
* Move temporal expressions to spark-expr crate * reduce public api * reduce public api * update imports in benchmarks * fmt * remove unused dep
1 parent c434872 commit ab1d30a

File tree

12 files changed

+69
-55
lines changed

12 files changed

+69
-55
lines changed

native/Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ bytes = "1.5.0"
6464
tempfile = "3.8.0"
6565
ahash = { version = "0.8", default-features = false }
6666
itertools = "0.11.0"
67-
chrono = { workspace = true }
6867
paste = "1.0.14"
6968
datafusion-common = { workspace = true }
7069
datafusion = { workspace = true }

native/core/benches/cast_from_string.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
use arrow_array::{builder::StringBuilder, RecordBatch};
1919
use arrow_schema::{DataType, Field, Schema};
20-
use comet::execution::datafusion::expressions::{cast::Cast, EvalMode};
2120
use criterion::{criterion_group, criterion_main, Criterion};
21+
use datafusion_comet_spark_expr::{Cast, EvalMode};
2222
use datafusion_physical_expr::{expressions::Column, PhysicalExpr};
2323
use std::sync::Arc;
2424

native/core/benches/cast_numeric.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
use arrow_array::{builder::Int32Builder, RecordBatch};
1919
use arrow_schema::{DataType, Field, Schema};
20-
use comet::execution::datafusion::expressions::{cast::Cast, EvalMode};
2120
use criterion::{criterion_group, criterion_main, Criterion};
21+
use datafusion_comet_spark_expr::{Cast, EvalMode};
2222
use datafusion_physical_expr::{expressions::Column, PhysicalExpr};
2323
use std::sync::Arc;
2424

native/core/src/execution/datafusion/expressions/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
//! Native DataFusion expressions
1919
2020
pub mod bitwise_not;
21-
pub use datafusion_comet_spark_expr::cast;
2221
pub mod checkoverflow;
2322
mod normalize_nan;
2423
pub mod scalar_funcs;
@@ -37,7 +36,6 @@ pub mod stddev;
3736
pub mod strings;
3837
pub mod subquery;
3938
pub mod sum_decimal;
40-
pub mod temporal;
4139
pub mod unbound;
4240
mod utils;
4341
pub mod variance;

native/core/src/execution/datafusion/expressions/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@
1616
// under the License.
1717

1818
// re-export for legacy reasons
19-
pub use datafusion_comet_spark_expr::utils::{array_with_timezone, down_cast_any_ref};
19+
pub use datafusion_comet_spark_expr::utils::down_cast_any_ref;

native/core/src/execution/datafusion/planner.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ use crate::{
7575
avg_decimal::AvgDecimal,
7676
bitwise_not::BitwiseNotExpr,
7777
bloom_filter_might_contain::BloomFilterMightContain,
78-
cast::Cast,
7978
checkoverflow::CheckOverflow,
8079
correlation::Correlation,
8180
covariance::Covariance,
@@ -86,7 +85,6 @@ use crate::{
8685
strings::{Contains, EndsWith, Like, StartsWith, StringSpaceExec, SubstringExec},
8786
subquery::Subquery,
8887
sum_decimal::SumDecimal,
89-
temporal::{DateTruncExec, HourExec, MinuteExec, SecondExec, TimestampTruncExec},
9088
unbound::UnboundColumn,
9189
variance::Variance,
9290
NormalizeNaNAndZero,
@@ -107,7 +105,9 @@ use crate::{
107105
};
108106

109107
use super::expressions::{create_named_struct::CreateNamedStruct, EvalMode};
110-
use datafusion_comet_spark_expr::{Abs, IfExpr};
108+
use datafusion_comet_spark_expr::{
109+
Abs, Cast, DateTruncExec, HourExec, IfExpr, MinuteExec, SecondExec, TimestampTruncExec,
110+
};
111111

112112
// For clippy error on type_complexity.
113113
type ExecResult<T> = Result<T, ExecutionError>;

native/core/src/execution/kernels/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,3 @@ mod hash;
2121
pub use hash::hash;
2222

2323
pub(crate) mod strings;
24-
pub(crate) mod temporal;
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Kernels
19+
20+
pub(crate) mod temporal;

native/core/src/execution/kernels/temporal.rs renamed to native/spark-expr/src/kernels/temporal.rs

Lines changed: 34 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,18 @@ use arrow_array::{
3232

3333
use arrow_schema::TimeUnit;
3434

35-
use crate::errors::ExpressionError;
35+
use crate::SparkError;
3636

3737
// Copied from arrow_arith/temporal.rs
3838
macro_rules! return_compute_error_with {
3939
($msg:expr, $param:expr) => {
40-
return {
41-
Err(ExpressionError::ArrowError(format!(
42-
"{}: {:?}",
43-
$msg, $param
44-
)))
45-
}
40+
return { Err(SparkError::Internal(format!("{}: {:?}", $msg, $param))) }
4641
};
4742
}
4843

4944
// The number of days between the beginning of the proleptic gregorian calendar (0001-01-01)
5045
// and the beginning of the Unix Epoch (1970-01-01)
5146
const DAYS_TO_UNIX_EPOCH: i32 = 719_163;
52-
const MICROS_TO_UNIX_EPOCH: i64 = 62_167_132_800 * 1_000_000;
5347

5448
// Copied from arrow_arith/temporal.rs with modification to the output datatype
5549
// Transforms a array of NaiveDate to an array of Date32 after applying an operation
@@ -102,7 +96,7 @@ fn as_timestamp_tz_with_op<A: ArrayAccessor<Item = T::Native>, T: ArrowTemporalT
10296
mut builder: PrimitiveBuilder<TimestampMicrosecondType>,
10397
tz: &str,
10498
op: F,
105-
) -> Result<TimestampMicrosecondArray, ExpressionError>
99+
) -> Result<TimestampMicrosecondArray, SparkError>
106100
where
107101
F: Fn(DateTime<Tz>) -> i64,
108102
i64: From<T::Native>,
@@ -113,7 +107,7 @@ where
113107
Some(value) => match as_datetime_with_timezone::<T>(value.into(), tz) {
114108
Some(time) => builder.append_value(op(time)),
115109
_ => {
116-
return Err(ExpressionError::ArrowError(
110+
return Err(SparkError::Internal(
117111
"Unable to read value as datetime".to_string(),
118112
));
119113
}
@@ -129,7 +123,7 @@ fn as_timestamp_tz_with_op_single<T: ArrowTemporalType, F>(
129123
builder: &mut PrimitiveBuilder<TimestampMicrosecondType>,
130124
tz: &Tz,
131125
op: F,
132-
) -> Result<(), ExpressionError>
126+
) -> Result<(), SparkError>
133127
where
134128
F: Fn(DateTime<Tz>) -> i64,
135129
i64: From<T::Native>,
@@ -138,7 +132,7 @@ where
138132
Some(value) => match as_datetime_with_timezone::<T>(value.into(), *tz) {
139133
Some(time) => builder.append_value(op(time)),
140134
_ => {
141-
return Err(ExpressionError::ArrowError(
135+
return Err(SparkError::Internal(
142136
"Unable to read value as datetime".to_string(),
143137
));
144138
}
@@ -256,7 +250,7 @@ fn trunc_date_to_microsec<T: Timelike>(dt: T) -> Option<T> {
256250
/// array is an array of Date32 values. The array may be a dictionary array.
257251
///
258252
/// format is a scalar string specifying the format to apply to the timestamp value.
259-
pub fn date_trunc_dyn(array: &dyn Array, format: String) -> Result<ArrayRef, ExpressionError> {
253+
pub(crate) fn date_trunc_dyn(array: &dyn Array, format: String) -> Result<ArrayRef, SparkError> {
260254
match array.data_type().clone() {
261255
DataType::Dictionary(_, _) => {
262256
downcast_dictionary_array!(
@@ -279,10 +273,10 @@ pub fn date_trunc_dyn(array: &dyn Array, format: String) -> Result<ArrayRef, Exp
279273
}
280274
}
281275

282-
pub fn date_trunc<T>(
276+
pub(crate) fn date_trunc<T>(
283277
array: &PrimitiveArray<T>,
284278
format: String,
285-
) -> Result<Date32Array, ExpressionError>
279+
) -> Result<Date32Array, SparkError>
286280
where
287281
T: ArrowTemporalType + ArrowNumericType,
288282
i64: From<T::Native>,
@@ -311,7 +305,7 @@ where
311305
builder,
312306
|dt| as_days_from_unix_epoch(trunc_date_to_week(dt)),
313307
)),
314-
_ => Err(ExpressionError::ArrowError(format!(
308+
_ => Err(SparkError::Internal(format!(
315309
"Unsupported format: {:?} for function 'date_trunc'",
316310
format
317311
))),
@@ -331,10 +325,10 @@ where
331325
///
332326
/// format is an array of strings specifying the format to apply to the corresponding date value.
333327
/// The array may be a dictionary array.
334-
pub fn date_trunc_array_fmt_dyn(
328+
pub(crate) fn date_trunc_array_fmt_dyn(
335329
array: &dyn Array,
336330
formats: &dyn Array,
337-
) -> Result<ArrayRef, ExpressionError> {
331+
) -> Result<ArrayRef, SparkError> {
338332
match (array.data_type().clone(), formats.data_type().clone()) {
339333
(DataType::Dictionary(_, v), DataType::Dictionary(_, f)) => {
340334
if !matches!(*v, DataType::Date32) {
@@ -403,7 +397,7 @@ pub fn date_trunc_array_fmt_dyn(
403397
.expect("Unexpected value type in formats"),
404398
)
405399
.map(|a| Arc::new(a) as ArrayRef),
406-
(dt, fmt) => Err(ExpressionError::ArrowError(format!(
400+
(dt, fmt) => Err(SparkError::Internal(format!(
407401
"Unsupported datatype: {:}, format: {:?} for function 'date_trunc'",
408402
dt, fmt
409403
))),
@@ -434,7 +428,7 @@ macro_rules! date_trunc_array_fmt_helper {
434428
"WEEK" => Ok(as_datetime_with_op_single(val, &mut builder, |dt| {
435429
as_days_from_unix_epoch(trunc_date_to_week(dt))
436430
})),
437-
_ => Err(ExpressionError::ArrowError(format!(
431+
_ => Err(SparkError::Internal(format!(
438432
"Unsupported format: {:?} for function 'date_trunc'",
439433
$formats.value(index)
440434
))),
@@ -454,7 +448,7 @@ macro_rules! date_trunc_array_fmt_helper {
454448
fn date_trunc_array_fmt_plain_plain(
455449
array: &Date32Array,
456450
formats: &StringArray,
457-
) -> Result<Date32Array, ExpressionError>
451+
) -> Result<Date32Array, SparkError>
458452
where
459453
{
460454
let data_type = array.data_type();
@@ -464,7 +458,7 @@ where
464458
fn date_trunc_array_fmt_plain_dict<K>(
465459
array: &Date32Array,
466460
formats: &TypedDictionaryArray<K, StringArray>,
467-
) -> Result<Date32Array, ExpressionError>
461+
) -> Result<Date32Array, SparkError>
468462
where
469463
K: ArrowDictionaryKeyType,
470464
{
@@ -475,7 +469,7 @@ where
475469
fn date_trunc_array_fmt_dict_plain<K>(
476470
array: &TypedDictionaryArray<K, Date32Array>,
477471
formats: &StringArray,
478-
) -> Result<Date32Array, ExpressionError>
472+
) -> Result<Date32Array, SparkError>
479473
where
480474
K: ArrowDictionaryKeyType,
481475
{
@@ -486,7 +480,7 @@ where
486480
fn date_trunc_array_fmt_dict_dict<K, F>(
487481
array: &TypedDictionaryArray<K, Date32Array>,
488482
formats: &TypedDictionaryArray<F, StringArray>,
489-
) -> Result<Date32Array, ExpressionError>
483+
) -> Result<Date32Array, SparkError>
490484
where
491485
K: ArrowDictionaryKeyType,
492486
F: ArrowDictionaryKeyType,
@@ -503,7 +497,10 @@ where
503497
/// timezone or no timezone. The array may be a dictionary array.
504498
///
505499
/// format is a scalar string specifying the format to apply to the timestamp value.
506-
pub fn timestamp_trunc_dyn(array: &dyn Array, format: String) -> Result<ArrayRef, ExpressionError> {
500+
pub(crate) fn timestamp_trunc_dyn(
501+
array: &dyn Array,
502+
format: String,
503+
) -> Result<ArrayRef, SparkError> {
507504
match array.data_type().clone() {
508505
DataType::Dictionary(_, _) => {
509506
downcast_dictionary_array!(
@@ -526,10 +523,10 @@ pub fn timestamp_trunc_dyn(array: &dyn Array, format: String) -> Result<ArrayRef
526523
}
527524
}
528525

529-
pub fn timestamp_trunc<T>(
526+
pub(crate) fn timestamp_trunc<T>(
530527
array: &PrimitiveArray<T>,
531528
format: String,
532-
) -> Result<TimestampMicrosecondArray, ExpressionError>
529+
) -> Result<TimestampMicrosecondArray, SparkError>
533530
where
534531
T: ArrowTemporalType + ArrowNumericType,
535532
i64: From<T::Native>,
@@ -589,7 +586,7 @@ where
589586
as_micros_from_unix_epoch_utc(trunc_date_to_microsec(dt))
590587
})
591588
}
592-
_ => Err(ExpressionError::ArrowError(format!(
589+
_ => Err(SparkError::Internal(format!(
593590
"Unsupported format: {:?} for function 'timestamp_trunc'",
594591
format
595592
))),
@@ -611,10 +608,10 @@ where
611608
///
612609
/// format is an array of strings specifying the format to apply to the corresponding timestamp
613610
/// value. The array may be a dictionary array.
614-
pub fn timestamp_trunc_array_fmt_dyn(
611+
pub(crate) fn timestamp_trunc_array_fmt_dyn(
615612
array: &dyn Array,
616613
formats: &dyn Array,
617-
) -> Result<ArrayRef, ExpressionError> {
614+
) -> Result<ArrayRef, SparkError> {
618615
match (array.data_type().clone(), formats.data_type().clone()) {
619616
(DataType::Dictionary(_, _), DataType::Dictionary(_, _)) => {
620617
downcast_dictionary_array!(
@@ -669,7 +666,7 @@ pub fn timestamp_trunc_array_fmt_dyn(
669666
dt => return_compute_error_with!("timestamp_trunc does not support", dt),
670667
)
671668
}
672-
(dt, fmt) => Err(ExpressionError::ArrowError(format!(
669+
(dt, fmt) => Err(SparkError::Internal(format!(
673670
"Unsupported datatype: {:}, format: {:?} for function 'timestamp_trunc'",
674671
dt, fmt
675672
))),
@@ -740,7 +737,7 @@ macro_rules! timestamp_trunc_array_fmt_helper {
740737
as_micros_from_unix_epoch_utc(trunc_date_to_microsec(dt))
741738
})
742739
}
743-
_ => Err(ExpressionError::ArrowError(format!(
740+
_ => Err(SparkError::Internal(format!(
744741
"Unsupported format: {:?} for function 'timestamp_trunc'",
745742
$formats.value(index)
746743
))),
@@ -762,7 +759,7 @@ macro_rules! timestamp_trunc_array_fmt_helper {
762759
fn timestamp_trunc_array_fmt_plain_plain<T>(
763760
array: &PrimitiveArray<T>,
764761
formats: &StringArray,
765-
) -> Result<TimestampMicrosecondArray, ExpressionError>
762+
) -> Result<TimestampMicrosecondArray, SparkError>
766763
where
767764
T: ArrowTemporalType + ArrowNumericType,
768765
i64: From<T::Native>,
@@ -773,7 +770,7 @@ where
773770
fn timestamp_trunc_array_fmt_plain_dict<T, K>(
774771
array: &PrimitiveArray<T>,
775772
formats: &TypedDictionaryArray<K, StringArray>,
776-
) -> Result<TimestampMicrosecondArray, ExpressionError>
773+
) -> Result<TimestampMicrosecondArray, SparkError>
777774
where
778775
T: ArrowTemporalType + ArrowNumericType,
779776
i64: From<T::Native>,
@@ -786,7 +783,7 @@ where
786783
fn timestamp_trunc_array_fmt_dict_plain<T, K>(
787784
array: &TypedDictionaryArray<K, PrimitiveArray<T>>,
788785
formats: &StringArray,
789-
) -> Result<TimestampMicrosecondArray, ExpressionError>
786+
) -> Result<TimestampMicrosecondArray, SparkError>
790787
where
791788
T: ArrowTemporalType + ArrowNumericType,
792789
i64: From<T::Native>,
@@ -799,7 +796,7 @@ where
799796
fn timestamp_trunc_array_fmt_dict_dict<T, K, F>(
800797
array: &TypedDictionaryArray<K, PrimitiveArray<T>>,
801798
formats: &TypedDictionaryArray<F, StringArray>,
802-
) -> Result<TimestampMicrosecondArray, ExpressionError>
799+
) -> Result<TimestampMicrosecondArray, SparkError>
803800
where
804801
T: ArrowTemporalType + ArrowNumericType,
805802
i64: From<T::Native>,
@@ -812,7 +809,7 @@ where
812809

813810
#[cfg(test)]
814811
mod tests {
815-
use crate::execution::kernels::temporal::{
812+
use crate::kernels::temporal::{
816813
date_trunc, date_trunc_array_fmt_dyn, timestamp_trunc, timestamp_trunc_array_fmt_dyn,
817814
};
818815
use arrow_array::{

0 commit comments

Comments
 (0)