Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make 'date_trunc' returns the same type as its input #6654

Merged
merged 9 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/core/tests/sql/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ async fn test_arrow_typeof() -> Result<()> {
"+--------------------------------------------------------------------------+",
"| arrow_typeof(date_trunc(Utf8(\"minute\"),to_timestamp_seconds(Int64(61)))) |",
"+--------------------------------------------------------------------------+",
"| Timestamp(Nanosecond, None) |",
"| Timestamp(Second, None) |",
"+--------------------------------------------------------------------------+",
];
assert_batches_eq!(expected, &actual);
Expand All @@ -723,7 +723,7 @@ async fn test_arrow_typeof() -> Result<()> {
"+-------------------------------------------------------------------------+",
"| arrow_typeof(date_trunc(Utf8(\"second\"),to_timestamp_millis(Int64(61)))) |",
"+-------------------------------------------------------------------------+",
"| Timestamp(Nanosecond, None) |",
"| Timestamp(Millisecond, None) |",
"+-------------------------------------------------------------------------+",
];
assert_batches_eq!(expected, &actual);
Expand All @@ -734,7 +734,7 @@ async fn test_arrow_typeof() -> Result<()> {
"+------------------------------------------------------------------------------+",
"| arrow_typeof(date_trunc(Utf8(\"millisecond\"),to_timestamp_micros(Int64(61)))) |",
"+------------------------------------------------------------------------------+",
"| Timestamp(Nanosecond, None) |",
"| Timestamp(Microsecond, None) |",
"+------------------------------------------------------------------------------+",
];
assert_batches_eq!(expected, &actual);
Expand Down
104 changes: 96 additions & 8 deletions datafusion/core/tests/sqllogictests/test_files/timestamps.slt
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@

statement ok
create table ts_data(ts bigint, value int) as values
(1599572549190855000, 1),
(1599568949190855000, 2),
(1599565349190855000, 3);
(1599572549190855123, 1),
(1599568949190855123, 2),
(1599565349190855123, 3);

statement ok
create table ts_data_nanos as select arrow_cast(ts, 'Timestamp(Nanosecond, None)') as ts, value from ts_data;
Expand Down Expand Up @@ -270,9 +270,9 @@ SELECT COUNT(*) FROM ts_data_secs where ts > from_unixtime(1599566400)
query P rowsort
SELECT DISTINCT ts FROM ts_data_nanos;
----
2020-09-08T11:42:29.190855
2020-09-08T12:42:29.190855
2020-09-08T13:42:29.190855
2020-09-08T11:42:29.190855123
2020-09-08T12:42:29.190855123
2020-09-08T13:42:29.190855123


query I
Expand Down Expand Up @@ -1010,6 +1010,96 @@ ts_data_secs 2020-09-08T00:00:00
ts_data_secs 2020-09-08T00:00:00
ts_data_secs 2020-09-08T00:00:00

# Test date trun on different granularity
query TP rowsort
SELECT 'millisecond', DATE_TRUNC('millisecond', ts) FROM ts_data_nanos
UNION ALL
SELECT 'microsecond', DATE_TRUNC('microsecond', ts) FROM ts_data_nanos
UNION ALL
SELECT 'second', DATE_TRUNC('second', ts) FROM ts_data_nanos
UNION ALL
SELECT 'minute', DATE_TRUNC('minute', ts) FROM ts_data_nanos
----
microsecond 2020-09-08T11:42:29.190855
microsecond 2020-09-08T12:42:29.190855
microsecond 2020-09-08T13:42:29.190855
millisecond 2020-09-08T11:42:29.190
millisecond 2020-09-08T12:42:29.190
millisecond 2020-09-08T13:42:29.190
minute 2020-09-08T11:42:00
minute 2020-09-08T12:42:00
minute 2020-09-08T13:42:00
second 2020-09-08T11:42:29
second 2020-09-08T12:42:29
second 2020-09-08T13:42:29

query TP rowsort
SELECT 'millisecond', DATE_TRUNC('millisecond', ts) FROM ts_data_micros
UNION ALL
SELECT 'microsecond', DATE_TRUNC('microsecond', ts) FROM ts_data_micros
UNION ALL
SELECT 'second', DATE_TRUNC('second', ts) FROM ts_data_micros
UNION ALL
SELECT 'minute', DATE_TRUNC('minute', ts) FROM ts_data_micros
----
microsecond 2020-09-08T11:42:29.190855
microsecond 2020-09-08T12:42:29.190855
microsecond 2020-09-08T13:42:29.190855
millisecond 2020-09-08T11:42:29.190
millisecond 2020-09-08T12:42:29.190
millisecond 2020-09-08T13:42:29.190
minute 2020-09-08T11:42:00
minute 2020-09-08T12:42:00
minute 2020-09-08T13:42:00
second 2020-09-08T11:42:29
second 2020-09-08T12:42:29
second 2020-09-08T13:42:29

query TP rowsort
SELECT 'millisecond', DATE_TRUNC('millisecond', ts) FROM ts_data_millis
UNION ALL
SELECT 'microsecond', DATE_TRUNC('microsecond', ts) FROM ts_data_millis
UNION ALL
SELECT 'second', DATE_TRUNC('second', ts) FROM ts_data_millis
UNION ALL
SELECT 'minute', DATE_TRUNC('minute', ts) FROM ts_data_millis
----
microsecond 2020-09-08T11:42:29.190
microsecond 2020-09-08T12:42:29.190
microsecond 2020-09-08T13:42:29.190
millisecond 2020-09-08T11:42:29.190
millisecond 2020-09-08T12:42:29.190
millisecond 2020-09-08T13:42:29.190
minute 2020-09-08T11:42:00
minute 2020-09-08T12:42:00
minute 2020-09-08T13:42:00
second 2020-09-08T11:42:29
second 2020-09-08T12:42:29
second 2020-09-08T13:42:29

query TP rowsort
SELECT 'millisecond', DATE_TRUNC('millisecond', ts) FROM ts_data_secs
UNION ALL
SELECT 'microsecond', DATE_TRUNC('microsecond', ts) FROM ts_data_secs
UNION ALL
SELECT 'second', DATE_TRUNC('second', ts) FROM ts_data_secs
UNION ALL
SELECT 'minute', DATE_TRUNC('minute', ts) FROM ts_data_secs
----
microsecond 2020-09-08T11:42:29
microsecond 2020-09-08T12:42:29
microsecond 2020-09-08T13:42:29
millisecond 2020-09-08T11:42:29
millisecond 2020-09-08T12:42:29
millisecond 2020-09-08T13:42:29
minute 2020-09-08T11:42:00
minute 2020-09-08T12:42:00
minute 2020-09-08T13:42:00
second 2020-09-08T11:42:29
second 2020-09-08T12:42:29
second 2020-09-08T13:42:29


# test date trunc on different timestamp scalar types and ensure they are consistent
query P rowsort
SELECT DATE_TRUNC('second', arrow_cast(TIMESTAMP '2023-08-03 14:38:50Z', 'Timestamp(Second, None)')) as ts
Expand All @@ -1026,8 +1116,6 @@ SELECT DATE_TRUNC('day', arrow_cast(TIMESTAMP '2023-08-03 14:38:50Z', 'Timestamp
2023-08-03T14:38:50




# Demonstrate that strings are automatically coerced to timestamps (don't use TIMESTAMP)

query P
Expand Down
29 changes: 18 additions & 11 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,17 +548,19 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Concat => Ok(Utf8),
BuiltinScalarFunction::ConcatWithSeparator => Ok(Utf8),
BuiltinScalarFunction::DatePart => Ok(Float64),
// DateTrunc always makes nanosecond timestamps
BuiltinScalarFunction::DateTrunc => Ok(Timestamp(Nanosecond, None)),
BuiltinScalarFunction::DateBin => match input_expr_types[1] {
Timestamp(Nanosecond, _) | Utf8 => Ok(Timestamp(Nanosecond, None)),
Timestamp(Microsecond, _) => Ok(Timestamp(Microsecond, None)),
Timestamp(Millisecond, _) => Ok(Timestamp(Millisecond, None)),
Timestamp(Second, _) => Ok(Timestamp(Second, None)),
_ => Err(DataFusionError::Internal(format!(
BuiltinScalarFunction::DateBin | BuiltinScalarFunction::DateTrunc => {
match input_expr_types[1] {
Timestamp(Nanosecond, _) | Utf8 | Null => {
Ok(Timestamp(Nanosecond, None))
}
Timestamp(Microsecond, _) => Ok(Timestamp(Microsecond, None)),
Timestamp(Millisecond, _) => Ok(Timestamp(Millisecond, None)),
Timestamp(Second, _) => Ok(Timestamp(Second, None)),
_ => Err(DataFusionError::Internal(format!(
"The {self} function can only accept timestamp as the second arg."
))),
},
}
}
BuiltinScalarFunction::InitCap => {
utf8_to_str_type(&input_expr_types[0], "initcap")
}
Expand Down Expand Up @@ -884,8 +886,13 @@ impl BuiltinScalarFunction {
],
self.volatility(),
),
BuiltinScalarFunction::DateTrunc => Signature::exact(
vec![Utf8, Timestamp(Nanosecond, None)],
BuiltinScalarFunction::DateTrunc => Signature::one_of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this clause seems unreachable given the change above to match BuiltinScalarFunction::DateTrunc | BuiltinScalarFunction::DateBin => {

I would personally suggest keeping this change and removing the BuiltinScalarFunction::DateTrunc | BuiltinScalarFunction::DateBin => { change as it make the code more explicy

vec![
Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
Exact(vec![Utf8, Timestamp(Microsecond, None)]),
Exact(vec![Utf8, Timestamp(Millisecond, None)]),
Exact(vec![Utf8, Timestamp(Second, None)]),
],
self.volatility(),
),
BuiltinScalarFunction::DateBin => {
Expand Down
159 changes: 122 additions & 37 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,17 @@ fn quarter_month(date: &NaiveDateTime) -> u32 {
}

fn date_trunc_single(granularity: &str, value: i64) -> Result<i64> {
if granularity == "millisecond" || granularity == "microsecond" {
return Ok(value);
}
Comment on lines +218 to +220
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm? Why this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think #6654 (comment) is the answer -- perhaps I can remove this to make it easier to understand

Copy link
Contributor

@alamb alamb Jun 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this very confusing -- I think it is because for those granularities, the conversion to chrono::DateTime and back is unecessary (as only sec/min/hour/etc are non uniform in width) -- I will try and make it clearer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is it? #6654 (comment) notes about the truncation of timestamps to seconds. But my question is why pulling "millisecond" and "microsecond" earlier which basically avoids with_nanosecond(0) only, compared with previous code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the utilization of with_nanosecond(0), the timestamp experiences a reduction in granularity at the nanosecond level. But for "millisecond" and "microsecond", we need to keep this granularity and truncate it later.

Copy link
Member

@viirya viirya Jun 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean with_nanosecond also cleans up millisecond and microsecond in the timestamp? Otherwise nanosecond is smaller granularity than millisecond and microsecond. Why reduction at nanosecond level will affect millisecond and microsecond?

E.g, timestamp is 1.001000001 (1s + 1 ms + 1 ns). After with_nanosecond(0), it will be 1.000000000 instead of 1.001000000?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean with_nanosecond also cleans up millisecond and microsecond in the timestamp? Otherwise nanosecond is smaller granularity than millisecond and microsecond. Why reduction at nanosecond level will affect millisecond and microsecond?

Yes, I think that is correct.

I played around with it and we can simplify the logic like this which I think may make it easier to understand (the test still pass). If you like this better (I do) I can make it a PR (follow on to #6783)

diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs
index 3a36e8a489..89ec508024 100644
--- a/datafusion/physical-expr/src/datetime_expressions.rs
+++ b/datafusion/physical-expr/src/datetime_expressions.rs
@@ -215,42 +215,49 @@ fn quarter_month(date: &NaiveDateTime) -> u32 {
 }
 
 fn date_trunc_single(granularity: &str, value: i64) -> Result<i64> {
-    if granularity == "millisecond" || granularity == "microsecond" {
-        return Ok(value);
-    }
+    let value = timestamp_ns_to_datetime(value).ok_or_else(|| {
+        DataFusionError::Execution(format!("Timestamp {value} out of range"))
+    })?;
+
+    let value = Some(value);
 
-    let value = timestamp_ns_to_datetime(value)
-        .ok_or_else(|| {
-            DataFusionError::Execution(format!("Timestamp {value} out of range"))
-        })?
-        .with_nanosecond(0);
     let value = match granularity {
-        "second" => value,
-        "minute" => value.and_then(|d| d.with_second(0)),
+        "millisecond" => value,
+        "microsecond" => value,
+        "second" => value.and_then(|d| d.with_nanosecond(0)),
+        "minute" => value
+            .and_then(|d| d.with_nanosecond(0))
+            .and_then(|d| d.with_second(0)),
         "hour" => value
+            .and_then(|d| d.with_nanosecond(0))
             .and_then(|d| d.with_second(0))
             .and_then(|d| d.with_minute(0)),
         "day" => value
+            .and_then(|d| d.with_nanosecond(0))
             .and_then(|d| d.with_second(0))
             .and_then(|d| d.with_minute(0))
             .and_then(|d| d.with_hour(0)),
         "week" => value
+            .and_then(|d| d.with_nanosecond(0))
             .and_then(|d| d.with_second(0))
             .and_then(|d| d.with_minute(0))
             .and_then(|d| d.with_hour(0))
             .map(|d| d - Duration::seconds(60 * 60 * 24 * d.weekday() as i64)),
         "month" => value
+            .and_then(|d| d.with_nanosecond(0))
             .and_then(|d| d.with_second(0))
             .and_then(|d| d.with_minute(0))
             .and_then(|d| d.with_hour(0))
             .and_then(|d| d.with_day0(0)),
         "quarter" => value
+            .and_then(|d| d.with_nanosecond(0))
             .and_then(|d| d.with_second(0))
             .and_then(|d| d.with_minute(0))
             .and_then(|d| d.with_hour(0))
             .and_then(|d| d.with_day0(0))
             .and_then(|d| d.with_month(quarter_month(&d))),
         "year" => value
+            .and_then(|d| d.with_nanosecond(0))
             .and_then(|d| d.with_second(0))
             .and_then(|d| d.with_minute(0))
             .and_then(|d| d.with_hour(0))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think it looks clearer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


let value = timestamp_ns_to_datetime(value)
.ok_or_else(|| {
DataFusionError::Execution(format!("Timestamp {value} out of range"))
})?
.with_nanosecond(0);
let value = match granularity {
"second" | "millisecond" | "microsecond" => value,
"second" => value,
"minute" => value.and_then(|d| d.with_second(0)),
"hour" => value
.and_then(|d| d.with_second(0))
Expand Down Expand Up @@ -262,6 +266,55 @@ fn date_trunc_single(granularity: &str, value: i64) -> Result<i64> {
Ok(value.unwrap().timestamp_nanos())
}

fn _date_trunc(
tu: TimeUnit,
value: &Option<i64>,
granularity: &str,
f: impl Fn(Option<i64>) -> Result<Option<i64>>,
) -> Result<Option<i64>, DataFusionError> {
let scale = match tu {
TimeUnit::Second => 1_000_000_000,
TimeUnit::Millisecond => 1_000_000,
TimeUnit::Microsecond => 1_000,
TimeUnit::Nanosecond => 1,
};

let Some(value) = value else {
return Ok(None);
};

// convert to nanoseconds
let Some(nano) = (f)(Some(value * scale))? else {
return Ok(None);
};

let result = match tu {
TimeUnit::Second => match granularity {
"minute" => Some(nano / 1_000_000_000 / 60 * 60),
_ => Some(nano / 1_000_000_000),
},
TimeUnit::Millisecond => match granularity {
"minute" => Some(nano / 1_000_000 / 1_000 / 60 * 1_000 * 60),
"second" => Some(nano / 1_000_000 / 1_000 * 1_000),
_ => Some(nano / 1_000_000),
},
TimeUnit::Microsecond => match granularity {
"minute" => Some(nano / 1_000 / 1_000_000 / 60 * 60 * 1_000_000),
"second" => Some(nano / 1_000 / 1_000_000 * 1_000_000),
"millisecond" => Some(nano / 1_000 / 1_000 * 1_000),
_ => Some(nano / 1_000),
},
_ => match granularity {
"minute" => Some(nano / 1_000_000_000 / 60 * 1_000_000_000 * 60),
"second" => Some(nano / 1_000_000_000 * 1_000_000_000),
"millisecond" => Some(nano / 1_000_000 * 1_000_000),
"microsecond" => Some(nano / 1_000 * 1_000),
_ => Some(nano),
},
Comment on lines +291 to +313
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks correct.

};
Ok(result)
}

/// date_trunc SQL function
pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let (granularity, array) = (&args[0], &args[1]);
Expand All @@ -282,49 +335,81 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {

Ok(match array {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
let nano = (f)(*v)?;

match granularity.as_str() {
"minute" => {
// trunc to minute
let second = ScalarValue::TimestampNanosecond(
nano.map(|nano| nano / 1_000_000_000 * 1_000_000_000),
tz_opt.clone(),
);
ColumnarValue::Scalar(second)
let value = _date_trunc(TimeUnit::Nanosecond, v, granularity.as_str(), f)?;
let value = ScalarValue::TimestampNanosecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
}
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
let value = _date_trunc(TimeUnit::Microsecond, v, granularity.as_str(), f)?;
let value = ScalarValue::TimestampMicrosecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
}
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
let value = _date_trunc(TimeUnit::Millisecond, v, granularity.as_str(), f)?;
let value = ScalarValue::TimestampMillisecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
}
ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
let value = _date_trunc(TimeUnit::Second, v, granularity.as_str(), f)?;
let value = ScalarValue::TimestampSecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
}
ColumnarValue::Array(array) => {
Copy link
Member Author

@Weijun-H Weijun-H Jun 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Array, should it return the values based on the input type or just return TimestampNanoSecond consistently 🤔?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Array, should it return the values based on the input type

It should do the same as is done for the Scalar version.

My understanding is that this PR changes the Scalar version to return a type based on the input types (not TimestampNanoSecond) so the array version should as well

let array_type = array.data_type();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must be missing something here -- it seems like granularity is not used for the array variants

So like date_trunc(x, 'minutes') will return the same value as date_trunc(x, 'seconds') which doesn't seem right

Maybe that indicates missing test coverage 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use f function pointer, which uses granularity as parameter

match array_type {
DataType::Timestamp(TimeUnit::Second, _) => {
let array = as_timestamp_second_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(TimeUnit::Second, &x, granularity.as_str(), f)
})
.collect::<Result<TimestampSecondArray>>()?;
ColumnarValue::Array(Arc::new(array))
}
"second" => {
// trunc to second
let mill = ScalarValue::TimestampNanosecond(
alamb marked this conversation as resolved.
Show resolved Hide resolved
nano.map(|nano| nano / 1_000_000 * 1_000_000),
tz_opt.clone(),
);
ColumnarValue::Scalar(mill)
DataType::Timestamp(TimeUnit::Millisecond, _) => {
let array = as_timestamp_millisecond_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(
TimeUnit::Millisecond,
&x,
granularity.as_str(),
f,
)
})
.collect::<Result<TimestampMillisecondArray>>()?;
ColumnarValue::Array(Arc::new(array))
}
"millisecond" => {
// trunc to microsecond
let micro = ScalarValue::TimestampNanosecond(
nano.map(|nano| nano / 1_000 * 1_000),
tz_opt.clone(),
);
ColumnarValue::Scalar(micro)
DataType::Timestamp(TimeUnit::Microsecond, _) => {
let array = as_timestamp_microsecond_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(
TimeUnit::Microsecond,
&x,
granularity.as_str(),
f,
)
})
.collect::<Result<TimestampMicrosecondArray>>()?;
ColumnarValue::Array(Arc::new(array))
}
_ => {
// trunc to nanosecond
let nano = ScalarValue::TimestampNanosecond(nano, tz_opt.clone());
ColumnarValue::Scalar(nano)
let array = as_timestamp_nanosecond_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(TimeUnit::Nanosecond, &x, granularity.as_str(), f)
})
.collect::<Result<TimestampNanosecondArray>>()?;

ColumnarValue::Array(Arc::new(array))
}
}
}
ColumnarValue::Array(array) => {
let array = as_timestamp_nanosecond_array(array)?;
let array = array
.iter()
.map(f)
.collect::<Result<TimestampNanosecondArray>>()?;

ColumnarValue::Array(Arc::new(array))
}
_ => {
return Err(DataFusionError::Execution(
"second argument of `date_trunc` must be nanosecond timestamp scalar or array".to_string(),
Expand Down