Skip to content

Commit

Permalink
[task #8987]add_to_date_function
Browse files Browse the repository at this point in the history
Signed-off-by: tangruilin <tang.ruilin@foxmail.com>
  • Loading branch information
Tangruilin committed Jan 30, 2024
1 parent d6ab343 commit 474fb34
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 2 deletions.
7 changes: 7 additions & 0 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use arrow::{
},
};
use arrow_array::cast::as_list_array;
use arrow_array::types::Date32Type;

/// A dynamically typed, nullable single value, (the single-valued counter-part
/// to arrow's [`Array`])
Expand Down Expand Up @@ -3229,6 +3230,12 @@ impl ScalarType<i64> for TimestampNanosecondType {
}
}

impl ScalarType<i32> for Date32Type {
fn scalar(r: Option<i32>) -> ScalarValue {
ScalarValue::Date32(r)
}
}

#[cfg(test)]
mod tests {
use std::cmp::Ordering;
Expand Down
6 changes: 6 additions & 0 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ pub enum BuiltinScalarFunction {
ToTimestampSeconds,
/// from_unixtime
FromUnixtime,
/// to_date
ToDate,
///now
Now,
///current_date
Expand Down Expand Up @@ -490,6 +492,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::RegexpMatch => Volatility::Immutable,
BuiltinScalarFunction::Struct => Volatility::Immutable,
BuiltinScalarFunction::FromUnixtime => Volatility::Immutable,
BuiltinScalarFunction::ToDate => Volatility::Immutable,
BuiltinScalarFunction::ArrowTypeof => Volatility::Immutable,
BuiltinScalarFunction::OverLay => Volatility::Immutable,
BuiltinScalarFunction::Levenshtein => Volatility::Immutable,
Expand Down Expand Up @@ -829,6 +832,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ToTimestampMicros => Ok(Timestamp(Microsecond, None)),
BuiltinScalarFunction::ToTimestampSeconds => Ok(Timestamp(Second, None)),
BuiltinScalarFunction::FromUnixtime => Ok(Timestamp(Second, None)),
BuiltinScalarFunction::ToDate => Ok(Date32),
BuiltinScalarFunction::Now => {
Ok(Timestamp(Nanosecond, Some("+00:00".into())))
}
Expand Down Expand Up @@ -1076,6 +1080,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::FromUnixtime => {
Signature::uniform(1, vec![Int64], self.volatility())
}
BuiltinScalarFunction::ToDate => Signature::variadic_any(self.volatility()),
BuiltinScalarFunction::Digest => Signature::one_of(
vec![
Exact(vec![Utf8, Utf8]),
Expand Down Expand Up @@ -1532,6 +1537,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ToTimestampSeconds => &["to_timestamp_seconds"],
BuiltinScalarFunction::ToTimestampNanos => &["to_timestamp_nanos"],
BuiltinScalarFunction::FromUnixtime => &["from_unixtime"],
BuiltinScalarFunction::ToDate => &["to_date"],

// hashing functions
BuiltinScalarFunction::Digest => &["digest"],
Expand Down
5 changes: 5 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,11 @@ nary_scalar_expr!(
scalar_expr!(DatePart, date_part, part date, "extracts a subfield from the date");
scalar_expr!(DateTrunc, date_trunc, part date, "truncates the date to a specified level of precision");
scalar_expr!(DateBin, date_bin, stride source origin, "coerces an arbitrary timestamp to the start of the nearest specified interval");
nary_scalar_expr!(
ToDate,
to_date,
"converts a string and optional formats to a `Date32`"
);
nary_scalar_expr!(
ToTimestamp,
to_timestamp,
Expand Down
77 changes: 76 additions & 1 deletion datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use arrow::{
};
use arrow_array::temporal_conversions::NANOSECONDS;
use arrow_array::timezone::Tz;
use arrow_array::types::ArrowTimestampType;
use arrow_array::types::{ArrowTimestampType, Date32Type};
use arrow_array::GenericStringArray;
use chrono::prelude::*;
use chrono::LocalResult::Single;
Expand Down Expand Up @@ -391,6 +391,42 @@ fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> {
string_to_timestamp_nanos(s).map_err(|e| e.into())
}

fn to_date_impl(args: &[ColumnarValue], name: &str) -> Result<ColumnarValue> {
match args.len() {
1 => handle::<Date32Type, _, Date32Type>(
args,
|s| {
string_to_timestamp_nanos_shim(s)
.map(|n| n / (1_000_000 * 24 * 60 * 60 * 1_000))
.and_then(|v| {
v.try_into().map_err(|_| {
DataFusionError::NotImplemented("()".to_string())
})
})
},
name,
),
n if n >= 2 => handle_multiple::<Date32Type, _, Date32Type, _>(
args,
|s, format| {
string_to_timestamp_nanos_formatted(s, format)
.map(|n| {
println!("{n}");
n / (1_000_000 * 24 * 60 * 60 * 1_000)
})
.and_then(|v| {
v.try_into().map_err(|_| {
DataFusionError::NotImplemented("()".to_string())
})
})
},
|n| n,
name,
),
_ => internal_err!("Unsupported 0 argument count for function {name}"),
}
}

fn to_timestamp_impl<T: ArrowTimestampType + ScalarType<i64>>(
args: &[ColumnarValue],
name: &str,
Expand Down Expand Up @@ -418,6 +454,11 @@ fn to_timestamp_impl<T: ArrowTimestampType + ScalarType<i64>>(
}
}

/// to_date SQL function
pub fn to_date(args: &[ColumnarValue]) -> Result<ColumnarValue> {
to_date_impl(args, "to_date")
}

/// to_timestamp SQL function
///
/// Note: `to_timestamp` returns `Timestamp(Nanosecond)` though its arguments are interpreted as **seconds**.
Expand Down Expand Up @@ -1210,6 +1251,40 @@ fn validate_to_timestamp_data_types(
None
}

// TODO: 实现这个函数
/// to_date SQL function implementation
pub fn to_date_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.is_empty() {
return internal_err!(
"to_date function requires 1 or more arguments, got {}",
args.len()
);
}

// validate that any args after the first one are Utf8
if args.len() > 1 {
if let Some(value) = validate_to_timestamp_data_types(args, "to_date") {
return value;
}
}

match args[0].data_type() {
DataType::Int32 | DataType::Int64 => {
cast_column(&args[0], &DataType::Date32, None)
}
DataType::Null | DataType::Float64 => {
cast_column(&args[0], &DataType::Date32, None)
}
DataType::Date32 | DataType::Date64 => {
cast_column(&args[0], &DataType::Date32, None)
}
DataType::Utf8 => to_date(args),
other => {
internal_err!("Unsupported data type {:?} for function to_date", other)
}
}
}

/// to_timestamp() SQL function implementation
pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ pub fn create_physical_fun(
BuiltinScalarFunction::FromUnixtime => {
Arc::new(datetime_expressions::from_unixtime_invoke)
}
BuiltinScalarFunction::ToDate => Arc::new(datetime_expressions::to_date_invoke),
BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() {
DataType::Utf8 => {
make_scalar_function_inner(string_expressions::initcap::<i32>)(args)
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ enum ScalarFunction {
ArrayResize = 130;
EndsWith = 131;
InStr = 132;
ToDate = 133;
}

message ScalarFunctionNode {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use datafusion_common::{
Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference,
Result, ScalarValue,
};
use datafusion_expr::expr::{Alias, Placeholder};
use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by};
use datafusion_expr::{
abs, acos, acosh, array, array_append, array_concat, array_dims, array_distinct,
Expand Down Expand Up @@ -72,6 +71,10 @@ use datafusion_expr::{
JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound,
WindowFrameUnits,
};
use datafusion_expr::{
expr::{Alias, Placeholder},
to_date,
};

#[derive(Debug)]
pub enum Error {
Expand Down Expand Up @@ -572,6 +575,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::Levenshtein => Self::Levenshtein,
ScalarFunction::SubstrIndex => Self::SubstrIndex,
ScalarFunction::FindInSet => Self::FindInSet,
ScalarFunction::ToDate => Self::ToDate,
}
}
}
Expand Down Expand Up @@ -1813,6 +1817,16 @@ pub fn parse_expr(
ScalarFunction::StructFun => {
Ok(struct_fun(parse_expr(&args[0], registry)?))
}
ScalarFunction::ToDate => {
let args: Vec<_> = args
.iter()
.map(|expr| parse_expr(expr, registry))
.collect::<std::result::Result<_, _>>()?;
Ok(Expr::ScalarFunction(expr::ScalarFunction::new(
BuiltinScalarFunction::ToDate,
args,
)))
}
}
}
ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode { fun_name, args }) => {
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
BuiltinScalarFunction::Levenshtein => Self::Levenshtein,
BuiltinScalarFunction::SubstrIndex => Self::SubstrIndex,
BuiltinScalarFunction::FindInSet => Self::FindInSet,
BuiltinScalarFunction::ToDate => Self::ToDate,
};

Ok(scalar_function)
Expand Down
16 changes: 16 additions & 0 deletions datafusion/sqllogictest/test_files/dates.slt
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,19 @@ query ?
SELECT '2023-01-01T00:00:00'::timestamp - DATE '2021-01-01';
----
730 days 0 hours 0 mins 0.000000000 secs

# to_date_test
statement ok
create table to_date_t1(ts bigint) as VALUES
(1235865600000),
(1235865660000),
(1238544000000);


# query_cast_timestamp_millis
query P
SELECT to_date(ts / 1000) FROM to_date_t1 LIMIT 3
----
2009-03-01T00:00:00
2009-03-01T00:01:00
2009-04-01T00:00:00

0 comments on commit 474fb34

Please sign in to comment.