Skip to content

Commit e727bbf

Browse files
authored
cleanup scalar function impl (#8114)
1 parent 7fde76e commit e727bbf

File tree

3 files changed

+200
-146
lines changed

3 files changed

+200
-146
lines changed

datafusion/physical-expr/src/datetime_expressions.rs

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
//! DateTime expressions
1919
20+
use crate::datetime_expressions;
21+
use crate::expressions::cast_column;
2022
use arrow::array::Float64Builder;
2123
use arrow::compute::cast;
2224
use arrow::{
@@ -954,6 +956,154 @@ where
954956
Ok(b.finish())
955957
}
956958

959+
/// to_timestammp() SQL function implementation
960+
pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
961+
if args.len() != 1 {
962+
return internal_err!(
963+
"to_timestamp function requires 1 arguments, got {}",
964+
args.len()
965+
);
966+
}
967+
968+
match args[0].data_type() {
969+
DataType::Int64 => {
970+
cast_column(&args[0], &DataType::Timestamp(TimeUnit::Second, None), None)
971+
}
972+
DataType::Timestamp(_, None) => cast_column(
973+
&args[0],
974+
&DataType::Timestamp(TimeUnit::Nanosecond, None),
975+
None,
976+
),
977+
DataType::Utf8 => datetime_expressions::to_timestamp(args),
978+
other => {
979+
internal_err!(
980+
"Unsupported data type {:?} for function to_timestamp",
981+
other
982+
)
983+
}
984+
}
985+
}
986+
987+
/// to_timestamp_millis() SQL function implementation
988+
pub fn to_timestamp_millis_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
989+
if args.len() != 1 {
990+
return internal_err!(
991+
"to_timestamp_millis function requires 1 argument, got {}",
992+
args.len()
993+
);
994+
}
995+
996+
match args[0].data_type() {
997+
DataType::Int64 | DataType::Timestamp(_, None) => cast_column(
998+
&args[0],
999+
&DataType::Timestamp(TimeUnit::Millisecond, None),
1000+
None,
1001+
),
1002+
DataType::Utf8 => datetime_expressions::to_timestamp_millis(args),
1003+
other => {
1004+
internal_err!(
1005+
"Unsupported data type {:?} for function to_timestamp_millis",
1006+
other
1007+
)
1008+
}
1009+
}
1010+
}
1011+
1012+
/// to_timestamp_micros() SQL function implementation
1013+
pub fn to_timestamp_micros_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
1014+
if args.len() != 1 {
1015+
return internal_err!(
1016+
"to_timestamp_micros function requires 1 argument, got {}",
1017+
args.len()
1018+
);
1019+
}
1020+
1021+
match args[0].data_type() {
1022+
DataType::Int64 | DataType::Timestamp(_, None) => cast_column(
1023+
&args[0],
1024+
&DataType::Timestamp(TimeUnit::Microsecond, None),
1025+
None,
1026+
),
1027+
DataType::Utf8 => datetime_expressions::to_timestamp_micros(args),
1028+
other => {
1029+
internal_err!(
1030+
"Unsupported data type {:?} for function to_timestamp_micros",
1031+
other
1032+
)
1033+
}
1034+
}
1035+
}
1036+
1037+
/// to_timestamp_nanos() SQL function implementation
1038+
pub fn to_timestamp_nanos_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
1039+
if args.len() != 1 {
1040+
return internal_err!(
1041+
"to_timestamp_nanos function requires 1 argument, got {}",
1042+
args.len()
1043+
);
1044+
}
1045+
1046+
match args[0].data_type() {
1047+
DataType::Int64 | DataType::Timestamp(_, None) => cast_column(
1048+
&args[0],
1049+
&DataType::Timestamp(TimeUnit::Nanosecond, None),
1050+
None,
1051+
),
1052+
DataType::Utf8 => datetime_expressions::to_timestamp_nanos(args),
1053+
other => {
1054+
internal_err!(
1055+
"Unsupported data type {:?} for function to_timestamp_nanos",
1056+
other
1057+
)
1058+
}
1059+
}
1060+
}
1061+
1062+
/// to_timestamp_seconds() SQL function implementation
1063+
pub fn to_timestamp_seconds_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
1064+
if args.len() != 1 {
1065+
return internal_err!(
1066+
"to_timestamp_seconds function requires 1 argument, got {}",
1067+
args.len()
1068+
);
1069+
}
1070+
1071+
match args[0].data_type() {
1072+
DataType::Int64 | DataType::Timestamp(_, None) => {
1073+
cast_column(&args[0], &DataType::Timestamp(TimeUnit::Second, None), None)
1074+
}
1075+
DataType::Utf8 => datetime_expressions::to_timestamp_seconds(args),
1076+
other => {
1077+
internal_err!(
1078+
"Unsupported data type {:?} for function to_timestamp_seconds",
1079+
other
1080+
)
1081+
}
1082+
}
1083+
}
1084+
1085+
/// from_unixtime() SQL function implementation
1086+
pub fn from_unixtime_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
1087+
if args.len() != 1 {
1088+
return internal_err!(
1089+
"from_unixtime function requires 1 argument, got {}",
1090+
args.len()
1091+
);
1092+
}
1093+
1094+
match args[0].data_type() {
1095+
DataType::Int64 => {
1096+
cast_column(&args[0], &DataType::Timestamp(TimeUnit::Second, None), None)
1097+
}
1098+
other => {
1099+
internal_err!(
1100+
"Unsupported data type {:?} for function from_unixtime",
1101+
other
1102+
)
1103+
}
1104+
}
1105+
}
1106+
9571107
#[cfg(test)]
9581108
mod tests {
9591109
use std::sync::Arc;

datafusion/physical-expr/src/functions.rs

Lines changed: 38 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,12 @@ use crate::execution_props::ExecutionProps;
3434
use crate::sort_properties::SortProperties;
3535
use crate::{
3636
array_expressions, conditional_expressions, datetime_expressions,
37-
expressions::{cast_column, nullif_func},
38-
math_expressions, string_expressions, struct_expressions, PhysicalExpr,
39-
ScalarFunctionExpr,
37+
expressions::nullif_func, math_expressions, string_expressions, struct_expressions,
38+
PhysicalExpr, ScalarFunctionExpr,
4039
};
4140
use arrow::{
4241
array::ArrayRef,
4342
compute::kernels::length::{bit_length, length},
44-
datatypes::TimeUnit,
4543
datatypes::{DataType, Int32Type, Int64Type, Schema},
4644
};
4745
use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
@@ -71,143 +69,8 @@ pub fn create_physical_expr(
7169

7270
let data_type = fun.return_type(&input_expr_types)?;
7371

74-
let fun_expr: ScalarFunctionImplementation = match fun {
75-
// These functions need args and input schema to pick an implementation
76-
// Unlike the string functions, which actually figure out the function to use with each array,
77-
// here we return either a cast fn or string timestamp translation based on the expression data type
78-
// so we don't have to pay a per-array/batch cost.
79-
BuiltinScalarFunction::ToTimestamp => {
80-
Arc::new(match input_phy_exprs[0].data_type(input_schema) {
81-
Ok(DataType::Int64) => |col_values: &[ColumnarValue]| {
82-
cast_column(
83-
&col_values[0],
84-
&DataType::Timestamp(TimeUnit::Second, None),
85-
None,
86-
)
87-
},
88-
Ok(DataType::Timestamp(_, None)) => |col_values: &[ColumnarValue]| {
89-
cast_column(
90-
&col_values[0],
91-
&DataType::Timestamp(TimeUnit::Nanosecond, None),
92-
None,
93-
)
94-
},
95-
Ok(DataType::Utf8) => datetime_expressions::to_timestamp,
96-
other => {
97-
return internal_err!(
98-
"Unsupported data type {other:?} for function to_timestamp"
99-
);
100-
}
101-
})
102-
}
103-
BuiltinScalarFunction::ToTimestampMillis => {
104-
Arc::new(match input_phy_exprs[0].data_type(input_schema) {
105-
Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
106-
|col_values: &[ColumnarValue]| {
107-
cast_column(
108-
&col_values[0],
109-
&DataType::Timestamp(TimeUnit::Millisecond, None),
110-
None,
111-
)
112-
}
113-
}
114-
Ok(DataType::Utf8) => datetime_expressions::to_timestamp_millis,
115-
other => {
116-
return internal_err!(
117-
"Unsupported data type {other:?} for function to_timestamp_millis"
118-
);
119-
}
120-
})
121-
}
122-
BuiltinScalarFunction::ToTimestampMicros => {
123-
Arc::new(match input_phy_exprs[0].data_type(input_schema) {
124-
Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
125-
|col_values: &[ColumnarValue]| {
126-
cast_column(
127-
&col_values[0],
128-
&DataType::Timestamp(TimeUnit::Microsecond, None),
129-
None,
130-
)
131-
}
132-
}
133-
Ok(DataType::Utf8) => datetime_expressions::to_timestamp_micros,
134-
other => {
135-
return internal_err!(
136-
"Unsupported data type {other:?} for function to_timestamp_micros"
137-
);
138-
}
139-
})
140-
}
141-
BuiltinScalarFunction::ToTimestampNanos => {
142-
Arc::new(match input_phy_exprs[0].data_type(input_schema) {
143-
Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
144-
|col_values: &[ColumnarValue]| {
145-
cast_column(
146-
&col_values[0],
147-
&DataType::Timestamp(TimeUnit::Nanosecond, None),
148-
None,
149-
)
150-
}
151-
}
152-
Ok(DataType::Utf8) => datetime_expressions::to_timestamp_nanos,
153-
other => {
154-
return internal_err!(
155-
"Unsupported data type {other:?} for function to_timestamp_nanos"
156-
);
157-
}
158-
})
159-
}
160-
BuiltinScalarFunction::ToTimestampSeconds => Arc::new({
161-
match input_phy_exprs[0].data_type(input_schema) {
162-
Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
163-
|col_values: &[ColumnarValue]| {
164-
cast_column(
165-
&col_values[0],
166-
&DataType::Timestamp(TimeUnit::Second, None),
167-
None,
168-
)
169-
}
170-
}
171-
Ok(DataType::Utf8) => datetime_expressions::to_timestamp_seconds,
172-
other => {
173-
return internal_err!(
174-
"Unsupported data type {other:?} for function to_timestamp_seconds"
175-
);
176-
}
177-
}
178-
}),
179-
BuiltinScalarFunction::FromUnixtime => Arc::new({
180-
match input_phy_exprs[0].data_type(input_schema) {
181-
Ok(DataType::Int64) => |col_values: &[ColumnarValue]| {
182-
cast_column(
183-
&col_values[0],
184-
&DataType::Timestamp(TimeUnit::Second, None),
185-
None,
186-
)
187-
},
188-
other => {
189-
return internal_err!(
190-
"Unsupported data type {other:?} for function from_unixtime"
191-
);
192-
}
193-
}
194-
}),
195-
BuiltinScalarFunction::ArrowTypeof => {
196-
let input_data_type = input_phy_exprs[0].data_type(input_schema)?;
197-
Arc::new(move |_| {
198-
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(format!(
199-
"{input_data_type}"
200-
)))))
201-
})
202-
}
203-
BuiltinScalarFunction::Abs => {
204-
let input_data_type = input_phy_exprs[0].data_type(input_schema)?;
205-
let abs_fun = math_expressions::create_abs_function(&input_data_type)?;
206-
Arc::new(move |args| make_scalar_function(abs_fun)(args))
207-
}
208-
// These don't need args and input schema
209-
_ => create_physical_fun(fun, execution_props)?,
210-
};
72+
let fun_expr: ScalarFunctionImplementation =
73+
create_physical_fun(fun, execution_props)?;
21174

21275
let monotonicity = fun.monotonicity();
21376

@@ -397,6 +260,9 @@ pub fn create_physical_fun(
397260
) -> Result<ScalarFunctionImplementation> {
398261
Ok(match fun {
399262
// math functions
263+
BuiltinScalarFunction::Abs => {
264+
Arc::new(|args| make_scalar_function(math_expressions::abs_invoke)(args))
265+
}
400266
BuiltinScalarFunction::Acos => Arc::new(math_expressions::acos),
401267
BuiltinScalarFunction::Asin => Arc::new(math_expressions::asin),
402268
BuiltinScalarFunction::Atan => Arc::new(math_expressions::atan),
@@ -625,6 +491,24 @@ pub fn create_physical_fun(
625491
execution_props.query_execution_start_time,
626492
))
627493
}
494+
BuiltinScalarFunction::ToTimestamp => {
495+
Arc::new(datetime_expressions::to_timestamp_invoke)
496+
}
497+
BuiltinScalarFunction::ToTimestampMillis => {
498+
Arc::new(datetime_expressions::to_timestamp_millis_invoke)
499+
}
500+
BuiltinScalarFunction::ToTimestampMicros => {
501+
Arc::new(datetime_expressions::to_timestamp_micros_invoke)
502+
}
503+
BuiltinScalarFunction::ToTimestampNanos => {
504+
Arc::new(datetime_expressions::to_timestamp_nanos_invoke)
505+
}
506+
BuiltinScalarFunction::ToTimestampSeconds => {
507+
Arc::new(datetime_expressions::to_timestamp_seconds_invoke)
508+
}
509+
BuiltinScalarFunction::FromUnixtime => {
510+
Arc::new(datetime_expressions::from_unixtime_invoke)
511+
}
628512
BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() {
629513
DataType::Utf8 => {
630514
make_scalar_function(string_expressions::initcap::<i32>)(args)
@@ -927,11 +811,19 @@ pub fn create_physical_fun(
927811
}),
928812
BuiltinScalarFunction::Upper => Arc::new(string_expressions::upper),
929813
BuiltinScalarFunction::Uuid => Arc::new(string_expressions::uuid),
930-
_ => {
931-
return internal_err!(
932-
"create_physical_fun: Unsupported scalar function {fun:?}"
933-
);
934-
}
814+
BuiltinScalarFunction::ArrowTypeof => Arc::new(move |args| {
815+
if args.len() != 1 {
816+
return internal_err!(
817+
"arrow_typeof function requires 1 arguments, got {}",
818+
args.len()
819+
);
820+
}
821+
822+
let input_data_type = args[0].data_type();
823+
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(format!(
824+
"{input_data_type}"
825+
)))))
826+
}),
935827
})
936828
}
937829

0 commit comments

Comments
 (0)