-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make 'date_trunc' returns the same type as its input #6654
Changes from all commits
63eb6c1
9f8d128
8aabc4f
4dd152a
8df71e8
c0f0ffd
5aec1f9
3071716
d68ae79
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -215,13 +215,17 @@ fn quarter_month(date: &NaiveDateTime) -> u32 { | |
} | ||
|
||
fn date_trunc_single(granularity: &str, value: i64) -> Result<i64> { | ||
if granularity == "millisecond" || granularity == "microsecond" { | ||
return Ok(value); | ||
} | ||
Comment on lines
+218
to
+220
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm? Why this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think #6654 (comment) is the answer -- perhaps I can remove this to make it easier to understand There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find this very confusing -- I think it is because for those granularities, the conversion to chrono::DateTime and back is unecessary (as only sec/min/hour/etc are non uniform in width) -- I will try and make it clearer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, is it? #6654 (comment) notes about the truncation of timestamps to seconds. But my question is why pulling "millisecond" and "microsecond" earlier which basically avoids There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Due to the utilization of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean E.g, timestamp is 1.001000001 (1s + 1 ms + 1 ns). After There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, I think that is correct. I played around with it and we can simplify the logic like this which I think may make it easier to understand (the test still pass). If you like this better (I do) I can make it a PR (follow on to #6783) diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs
index 3a36e8a489..89ec508024 100644
--- a/datafusion/physical-expr/src/datetime_expressions.rs
+++ b/datafusion/physical-expr/src/datetime_expressions.rs
@@ -215,42 +215,49 @@ fn quarter_month(date: &NaiveDateTime) -> u32 {
}
fn date_trunc_single(granularity: &str, value: i64) -> Result<i64> {
- if granularity == "millisecond" || granularity == "microsecond" {
- return Ok(value);
- }
+ let value = timestamp_ns_to_datetime(value).ok_or_else(|| {
+ DataFusionError::Execution(format!("Timestamp {value} out of range"))
+ })?;
+
+ let value = Some(value);
- let value = timestamp_ns_to_datetime(value)
- .ok_or_else(|| {
- DataFusionError::Execution(format!("Timestamp {value} out of range"))
- })?
- .with_nanosecond(0);
let value = match granularity {
- "second" => value,
- "minute" => value.and_then(|d| d.with_second(0)),
+ "millisecond" => value,
+ "microsecond" => value,
+ "second" => value.and_then(|d| d.with_nanosecond(0)),
+ "minute" => value
+ .and_then(|d| d.with_nanosecond(0))
+ .and_then(|d| d.with_second(0)),
"hour" => value
+ .and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0)),
"day" => value
+ .and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0)),
"week" => value
+ .and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.map(|d| d - Duration::seconds(60 * 60 * 24 * d.weekday() as i64)),
"month" => value
+ .and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.and_then(|d| d.with_day0(0)),
"quarter" => value
+ .and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.and_then(|d| d.with_day0(0))
.and_then(|d| d.with_month(quarter_month(&d))),
"year" => value
+ .and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, I think it looks clearer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
let value = timestamp_ns_to_datetime(value) | ||
.ok_or_else(|| { | ||
DataFusionError::Execution(format!("Timestamp {value} out of range")) | ||
})? | ||
.with_nanosecond(0); | ||
let value = match granularity { | ||
"second" | "millisecond" | "microsecond" => value, | ||
"second" => value, | ||
"minute" => value.and_then(|d| d.with_second(0)), | ||
"hour" => value | ||
.and_then(|d| d.with_second(0)) | ||
|
@@ -262,6 +266,55 @@ fn date_trunc_single(granularity: &str, value: i64) -> Result<i64> { | |
Ok(value.unwrap().timestamp_nanos()) | ||
} | ||
|
||
fn _date_trunc( | ||
tu: TimeUnit, | ||
value: &Option<i64>, | ||
granularity: &str, | ||
f: impl Fn(Option<i64>) -> Result<Option<i64>>, | ||
) -> Result<Option<i64>, DataFusionError> { | ||
let scale = match tu { | ||
TimeUnit::Second => 1_000_000_000, | ||
TimeUnit::Millisecond => 1_000_000, | ||
TimeUnit::Microsecond => 1_000, | ||
TimeUnit::Nanosecond => 1, | ||
}; | ||
|
||
let Some(value) = value else { | ||
return Ok(None); | ||
}; | ||
|
||
// convert to nanoseconds | ||
let Some(nano) = (f)(Some(value * scale))? else { | ||
return Ok(None); | ||
}; | ||
|
||
let result = match tu { | ||
TimeUnit::Second => match granularity { | ||
"minute" => Some(nano / 1_000_000_000 / 60 * 60), | ||
_ => Some(nano / 1_000_000_000), | ||
}, | ||
TimeUnit::Millisecond => match granularity { | ||
"minute" => Some(nano / 1_000_000 / 1_000 / 60 * 1_000 * 60), | ||
"second" => Some(nano / 1_000_000 / 1_000 * 1_000), | ||
_ => Some(nano / 1_000_000), | ||
}, | ||
TimeUnit::Microsecond => match granularity { | ||
"minute" => Some(nano / 1_000 / 1_000_000 / 60 * 60 * 1_000_000), | ||
"second" => Some(nano / 1_000 / 1_000_000 * 1_000_000), | ||
"millisecond" => Some(nano / 1_000 / 1_000 * 1_000), | ||
_ => Some(nano / 1_000), | ||
}, | ||
_ => match granularity { | ||
"minute" => Some(nano / 1_000_000_000 / 60 * 1_000_000_000 * 60), | ||
"second" => Some(nano / 1_000_000_000 * 1_000_000_000), | ||
"millisecond" => Some(nano / 1_000_000 * 1_000_000), | ||
"microsecond" => Some(nano / 1_000 * 1_000), | ||
_ => Some(nano), | ||
}, | ||
Comment on lines
+291
to
+313
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks correct. |
||
}; | ||
Ok(result) | ||
} | ||
|
||
/// date_trunc SQL function | ||
pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> { | ||
let (granularity, array) = (&args[0], &args[1]); | ||
|
@@ -282,49 +335,81 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> { | |
|
||
Ok(match array { | ||
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => { | ||
let nano = (f)(*v)?; | ||
|
||
match granularity.as_str() { | ||
"minute" => { | ||
// trunc to minute | ||
let second = ScalarValue::TimestampNanosecond( | ||
nano.map(|nano| nano / 1_000_000_000 * 1_000_000_000), | ||
tz_opt.clone(), | ||
); | ||
ColumnarValue::Scalar(second) | ||
let value = _date_trunc(TimeUnit::Nanosecond, v, granularity.as_str(), f)?; | ||
let value = ScalarValue::TimestampNanosecond(value, tz_opt.clone()); | ||
ColumnarValue::Scalar(value) | ||
} | ||
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => { | ||
let value = _date_trunc(TimeUnit::Microsecond, v, granularity.as_str(), f)?; | ||
let value = ScalarValue::TimestampMicrosecond(value, tz_opt.clone()); | ||
ColumnarValue::Scalar(value) | ||
} | ||
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => { | ||
let value = _date_trunc(TimeUnit::Millisecond, v, granularity.as_str(), f)?; | ||
let value = ScalarValue::TimestampMillisecond(value, tz_opt.clone()); | ||
ColumnarValue::Scalar(value) | ||
} | ||
ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => { | ||
let value = _date_trunc(TimeUnit::Second, v, granularity.as_str(), f)?; | ||
let value = ScalarValue::TimestampSecond(value, tz_opt.clone()); | ||
ColumnarValue::Scalar(value) | ||
} | ||
ColumnarValue::Array(array) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For Array, should it return the values based on the input type or just return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It should do the same as is done for the Scalar version. My understanding is that this PR changes the Scalar version to return a type based on the input types (not TimestampNanoSecond) so the array version should as well |
||
let array_type = array.data_type(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I must be missing something here -- it seems like So like Maybe that indicates missing test coverage 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I use |
||
match array_type { | ||
DataType::Timestamp(TimeUnit::Second, _) => { | ||
let array = as_timestamp_second_array(array)?; | ||
let array = array | ||
.iter() | ||
.map(|x| { | ||
_date_trunc(TimeUnit::Second, &x, granularity.as_str(), f) | ||
}) | ||
.collect::<Result<TimestampSecondArray>>()?; | ||
ColumnarValue::Array(Arc::new(array)) | ||
} | ||
"second" => { | ||
// trunc to second | ||
let mill = ScalarValue::TimestampNanosecond( | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
nano.map(|nano| nano / 1_000_000 * 1_000_000), | ||
tz_opt.clone(), | ||
); | ||
ColumnarValue::Scalar(mill) | ||
DataType::Timestamp(TimeUnit::Millisecond, _) => { | ||
let array = as_timestamp_millisecond_array(array)?; | ||
let array = array | ||
.iter() | ||
.map(|x| { | ||
_date_trunc( | ||
TimeUnit::Millisecond, | ||
&x, | ||
granularity.as_str(), | ||
f, | ||
) | ||
}) | ||
.collect::<Result<TimestampMillisecondArray>>()?; | ||
ColumnarValue::Array(Arc::new(array)) | ||
} | ||
"millisecond" => { | ||
// trunc to microsecond | ||
let micro = ScalarValue::TimestampNanosecond( | ||
nano.map(|nano| nano / 1_000 * 1_000), | ||
tz_opt.clone(), | ||
); | ||
ColumnarValue::Scalar(micro) | ||
DataType::Timestamp(TimeUnit::Microsecond, _) => { | ||
let array = as_timestamp_microsecond_array(array)?; | ||
let array = array | ||
.iter() | ||
.map(|x| { | ||
_date_trunc( | ||
TimeUnit::Microsecond, | ||
&x, | ||
granularity.as_str(), | ||
f, | ||
) | ||
}) | ||
.collect::<Result<TimestampMicrosecondArray>>()?; | ||
ColumnarValue::Array(Arc::new(array)) | ||
} | ||
_ => { | ||
// trunc to nanosecond | ||
let nano = ScalarValue::TimestampNanosecond(nano, tz_opt.clone()); | ||
ColumnarValue::Scalar(nano) | ||
let array = as_timestamp_nanosecond_array(array)?; | ||
let array = array | ||
.iter() | ||
.map(|x| { | ||
_date_trunc(TimeUnit::Nanosecond, &x, granularity.as_str(), f) | ||
}) | ||
.collect::<Result<TimestampNanosecondArray>>()?; | ||
|
||
ColumnarValue::Array(Arc::new(array)) | ||
} | ||
} | ||
} | ||
ColumnarValue::Array(array) => { | ||
let array = as_timestamp_nanosecond_array(array)?; | ||
let array = array | ||
.iter() | ||
.map(f) | ||
.collect::<Result<TimestampNanosecondArray>>()?; | ||
|
||
ColumnarValue::Array(Arc::new(array)) | ||
} | ||
_ => { | ||
return Err(DataFusionError::Execution( | ||
"second argument of `date_trunc` must be nanosecond timestamp scalar or array".to_string(), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this clause seems unreachable given the change above to match
BuiltinScalarFunction::DateTrunc | BuiltinScalarFunction::DateBin => {
I would personally suggest keeping this change and removing the
BuiltinScalarFunction::DateTrunc | BuiltinScalarFunction::DateBin => {
change as it make the code more explicy