From 474fb34d7ca893ede2f7bdc84b66c596e4d2ce91 Mon Sep 17 00:00:00 2001 From: tangruilin Date: Thu, 25 Jan 2024 21:34:34 +0800 Subject: [PATCH] [task #8987]add_to_date_function Signed-off-by: tangruilin --- datafusion/common/src/scalar.rs | 7 ++ datafusion/expr/src/built_in_function.rs | 6 ++ datafusion/expr/src/expr_fn.rs | 5 ++ .../physical-expr/src/datetime_expressions.rs | 77 ++++++++++++++++++- datafusion/physical-expr/src/functions.rs | 1 + datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/generated/pbjson.rs | 3 + datafusion/proto/src/generated/prost.rs | 3 + .../proto/src/logical_plan/from_proto.rs | 16 +++- datafusion/proto/src/logical_plan/to_proto.rs | 1 + datafusion/sqllogictest/test_files/dates.slt | 16 ++++ 11 files changed, 134 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 99b8cff20de74..a1de7f36d1109 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -52,6 +52,7 @@ use arrow::{ }, }; use arrow_array::cast::as_list_array; +use arrow_array::types::Date32Type; /// A dynamically typed, nullable single value, (the single-valued counter-part /// to arrow's [`Array`]) @@ -3229,6 +3230,12 @@ impl ScalarType for TimestampNanosecondType { } } +impl ScalarType for Date32Type { + fn scalar(r: Option) -> ScalarValue { + ScalarValue::Date32(r) + } +} + #[cfg(test)] mod tests { use std::cmp::Ordering; diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index e86d6172cecdd..b399088a0dd46 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -289,6 +289,8 @@ pub enum BuiltinScalarFunction { ToTimestampSeconds, /// from_unixtime FromUnixtime, + /// to_date + ToDate, ///now Now, ///current_date @@ -490,6 +492,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::RegexpMatch => Volatility::Immutable, BuiltinScalarFunction::Struct => Volatility::Immutable, BuiltinScalarFunction::FromUnixtime => Volatility::Immutable, + BuiltinScalarFunction::ToDate => Volatility::Immutable, BuiltinScalarFunction::ArrowTypeof => Volatility::Immutable, BuiltinScalarFunction::OverLay => Volatility::Immutable, BuiltinScalarFunction::Levenshtein => Volatility::Immutable, @@ -829,6 +832,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ToTimestampMicros => Ok(Timestamp(Microsecond, None)), BuiltinScalarFunction::ToTimestampSeconds => Ok(Timestamp(Second, None)), BuiltinScalarFunction::FromUnixtime => Ok(Timestamp(Second, None)), + BuiltinScalarFunction::ToDate => Ok(Date32), BuiltinScalarFunction::Now => { Ok(Timestamp(Nanosecond, Some("+00:00".into()))) } @@ -1076,6 +1080,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::FromUnixtime => { Signature::uniform(1, vec![Int64], self.volatility()) } + BuiltinScalarFunction::ToDate => Signature::variadic_any(self.volatility()), BuiltinScalarFunction::Digest => Signature::one_of( vec![ Exact(vec![Utf8, Utf8]), @@ -1532,6 +1537,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ToTimestampSeconds => &["to_timestamp_seconds"], BuiltinScalarFunction::ToTimestampNanos => &["to_timestamp_nanos"], BuiltinScalarFunction::FromUnixtime => &["from_unixtime"], + BuiltinScalarFunction::ToDate => &["to_date"], // hashing functions BuiltinScalarFunction::Digest => &["digest"], diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 006b5f10f10d1..b071482fe7d7c 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -887,6 +887,11 @@ nary_scalar_expr!( scalar_expr!(DatePart, date_part, part date, "extracts a subfield from the date"); scalar_expr!(DateTrunc, date_trunc, part date, "truncates the date to a specified level of precision"); scalar_expr!(DateBin, date_bin, stride source origin, "coerces an arbitrary timestamp to the start of the nearest specified interval"); +nary_scalar_expr!( + ToDate, + to_date, + "converts a string and optional formats to a `Date32`" +); nary_scalar_expr!( ToTimestamp, to_timestamp, diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index d21d89c19d2eb..ffa173866618f 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -35,7 +35,7 @@ use arrow::{ }; use arrow_array::temporal_conversions::NANOSECONDS; use arrow_array::timezone::Tz; -use arrow_array::types::ArrowTimestampType; +use arrow_array::types::{ArrowTimestampType, Date32Type}; use arrow_array::GenericStringArray; use chrono::prelude::*; use chrono::LocalResult::Single; @@ -391,6 +391,42 @@ fn string_to_timestamp_nanos_shim(s: &str) -> Result { string_to_timestamp_nanos(s).map_err(|e| e.into()) } +fn to_date_impl(args: &[ColumnarValue], name: &str) -> Result { + match args.len() { + 1 => handle::( + args, + |s| { + string_to_timestamp_nanos_shim(s) + .map(|n| n / (1_000_000 * 24 * 60 * 60 * 1_000)) + .and_then(|v| { + v.try_into().map_err(|_| { + DataFusionError::NotImplemented("()".to_string()) + }) + }) + }, + name, + ), + n if n >= 2 => handle_multiple::( + args, + |s, format| { + string_to_timestamp_nanos_formatted(s, format) + .map(|n| { + println!("{n}"); + n / (1_000_000 * 24 * 60 * 60 * 1_000) + }) + .and_then(|v| { + v.try_into().map_err(|_| { + DataFusionError::NotImplemented("()".to_string()) + }) + }) + }, + |n| n, + name, + ), + _ => internal_err!("Unsupported 0 argument count for function {name}"), + } +} + fn to_timestamp_impl>( args: &[ColumnarValue], name: &str, @@ -418,6 +454,11 @@ fn to_timestamp_impl>( } } +/// to_date SQL function +pub fn to_date(args: &[ColumnarValue]) -> Result { + to_date_impl(args, "to_date") +} + /// to_timestamp SQL function /// /// Note: `to_timestamp` returns `Timestamp(Nanosecond)` though its arguments are interpreted as **seconds**. @@ -1210,6 +1251,40 @@ fn validate_to_timestamp_data_types( None } +// TODO: 实现这个函数 +/// to_date SQL function implementation +pub fn to_date_invoke(args: &[ColumnarValue]) -> Result { + if args.is_empty() { + return internal_err!( + "to_date function requires 1 or more arguments, got {}", + args.len() + ); + } + + // validate that any args after the first one are Utf8 + if args.len() > 1 { + if let Some(value) = validate_to_timestamp_data_types(args, "to_date") { + return value; + } + } + + match args[0].data_type() { + DataType::Int32 | DataType::Int64 => { + cast_column(&args[0], &DataType::Date32, None) + } + DataType::Null | DataType::Float64 => { + cast_column(&args[0], &DataType::Date32, None) + } + DataType::Date32 | DataType::Date64 => { + cast_column(&args[0], &DataType::Date32, None) + } + DataType::Utf8 => to_date(args), + other => { + internal_err!("Unsupported data type {:?} for function to_date", other) + } + } +} + /// to_timestamp() SQL function implementation pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result { if args.is_empty() { diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 2bfdf499123b1..868588bcbdc36 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -592,6 +592,7 @@ pub fn create_physical_fun( BuiltinScalarFunction::FromUnixtime => { Arc::new(datetime_expressions::from_unixtime_invoke) } + BuiltinScalarFunction::ToDate => Arc::new(datetime_expressions::to_date_invoke), BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function_inner(string_expressions::initcap::)(args) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 66c1271e65c1f..f65828090cacd 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -671,6 +671,7 @@ enum ScalarFunction { ArrayResize = 130; EndsWith = 131; InStr = 132; + ToDate = 133; } message ScalarFunctionNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 39a8678ef2509..47b8d493a42d2 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -22425,6 +22425,7 @@ impl serde::Serialize for ScalarFunction { Self::ArrayResize => "ArrayResize", Self::EndsWith => "EndsWith", Self::InStr => "InStr", + Self::ToDate => "ToDate", }; serializer.serialize_str(variant) } @@ -22569,6 +22570,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "ArrayResize", "EndsWith", "InStr", + "ToDate", ]; struct GeneratedVisitor; @@ -22742,6 +22744,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "ArrayResize" => Ok(ScalarFunction::ArrayResize), "EndsWith" => Ok(ScalarFunction::EndsWith), "InStr" => Ok(ScalarFunction::InStr), + "ToDate" => Ok(ScalarFunction::ToDate), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 7bf1d8ed04509..7e978fe3acae6 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2766,6 +2766,7 @@ pub enum ScalarFunction { ArrayResize = 130, EndsWith = 131, InStr = 132, + ToDate = 133, } impl ScalarFunction { /// String value of the enum field names used in the ProtoBuf definition. @@ -2907,6 +2908,7 @@ impl ScalarFunction { ScalarFunction::ArrayResize => "ArrayResize", ScalarFunction::EndsWith => "EndsWith", ScalarFunction::InStr => "InStr", + ScalarFunction::ToDate => "ToDate", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -3045,6 +3047,7 @@ impl ScalarFunction { "ArrayResize" => Some(Self::ArrayResize), "EndsWith" => Some(Self::EndsWith), "InStr" => Some(Self::InStr), + "ToDate" => Some(Self::ToDate), _ => None, } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 42d39b5c51395..ffee9464e16fc 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -44,7 +44,6 @@ use datafusion_common::{ Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result, ScalarValue, }; -use datafusion_expr::expr::{Alias, Placeholder}; use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by}; use datafusion_expr::{ abs, acos, acosh, array, array_append, array_concat, array_dims, array_distinct, @@ -72,6 +71,10 @@ use datafusion_expr::{ JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits, }; +use datafusion_expr::{ + expr::{Alias, Placeholder}, + to_date, +}; #[derive(Debug)] pub enum Error { @@ -572,6 +575,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::Levenshtein => Self::Levenshtein, ScalarFunction::SubstrIndex => Self::SubstrIndex, ScalarFunction::FindInSet => Self::FindInSet, + ScalarFunction::ToDate => Self::ToDate, } } } @@ -1813,6 +1817,16 @@ pub fn parse_expr( ScalarFunction::StructFun => { Ok(struct_fun(parse_expr(&args[0], registry)?)) } + ScalarFunction::ToDate => { + let args: Vec<_> = args + .iter() + .map(|expr| parse_expr(expr, registry)) + .collect::>()?; + Ok(Expr::ScalarFunction(expr::ScalarFunction::new( + BuiltinScalarFunction::ToDate, + args, + ))) + } } } ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode { fun_name, args }) => { diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index dbb52eced36c9..3366743ae677a 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -1566,6 +1566,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::Levenshtein => Self::Levenshtein, BuiltinScalarFunction::SubstrIndex => Self::SubstrIndex, BuiltinScalarFunction::FindInSet => Self::FindInSet, + BuiltinScalarFunction::ToDate => Self::ToDate, }; Ok(scalar_function) diff --git a/datafusion/sqllogictest/test_files/dates.slt b/datafusion/sqllogictest/test_files/dates.slt index a93a7ff7e73cd..0e2ba50574149 100644 --- a/datafusion/sqllogictest/test_files/dates.slt +++ b/datafusion/sqllogictest/test_files/dates.slt @@ -107,3 +107,19 @@ query ? SELECT '2023-01-01T00:00:00'::timestamp - DATE '2021-01-01'; ---- 730 days 0 hours 0 mins 0.000000000 secs + +# to_date_test +statement ok +create table to_date_t1(ts bigint) as VALUES + (1235865600000), + (1235865660000), + (1238544000000); + + +# query_cast_timestamp_millis +query P +SELECT to_date(ts / 1000) FROM to_date_t1 LIMIT 3 +---- +2009-03-01T00:00:00 +2009-03-01T00:01:00 +2009-04-01T00:00:00