Skip to content

Commit

Permalink
parse string to timestamp (#692)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelLeeHZ authored Mar 6, 2023
1 parent 95ea870 commit f79890b
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions query_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true
# In alphabetical order
arrow = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
common_types = { workspace = true }
common_util = { workspace = true }
datafusion = { workspace = true }
Expand Down
96 changes: 84 additions & 12 deletions query_engine/src/logical_optimizer/type_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

use std::{mem, sync::Arc};

use arrow::{compute, compute::kernels::cast_utils::string_to_timestamp_nanos};
use arrow::{compute, compute::kernels::cast_utils::string_to_timestamp_nanos, error::ArrowError};
use chrono::{Local, LocalResult, NaiveDateTime, TimeZone, Utc};
use datafusion::{
arrow::datatypes::DataType,
common::DFSchemaRef,
Expand Down Expand Up @@ -186,7 +187,10 @@ impl<'a> TypeRewriter<'a> {
fn cast_scalar_value(value: &ScalarValue, data_type: &DataType) -> Result<ScalarValue> {
if let DataType::Timestamp(_, _) = data_type {
if let ScalarValue::Utf8(Some(v)) = value {
return string_to_timestamp_ms(v);
return match string_to_timestamp_ms_workaround(v) {
Ok(v) => Ok(v),
_ => string_to_timestamp_ms(v),
};
}
}

Expand Down Expand Up @@ -281,14 +285,56 @@ impl<'a> ExprRewriter for TypeRewriter<'a> {
}

fn string_to_timestamp_ms(string: &str) -> Result<ScalarValue> {
Ok(ScalarValue::TimestampMillisecond(
Some(
string_to_timestamp_nanos(string)
.map(|t| t / 1_000_000)
.map_err(DataFusionError::from)?,
),
None,
))
let ts = string_to_timestamp_nanos(string)
.map(|t| t / 1_000_000)
.map_err(DataFusionError::from)?;
Ok(ScalarValue::TimestampMillisecond(Some(ts), None))
}

// TODO(lee): remove following codes after PR(https://github.com/apache/arrow-rs/pull/3787) merged
fn string_to_timestamp_ms_workaround(string: &str) -> Result<ScalarValue> {
// Because function `string_to_timestamp_nanos` returns a NaiveDateTime's
// nanoseconds from a string without a specify time zone, We need to convert
// it to local timestamp.

// without a timezone specifier as a local time, using 'T' as a separator
// Example: 2020-09-08T13:42:29.190855
if let Ok(ts) = NaiveDateTime::parse_from_str(string, "%Y-%m-%dT%H:%M:%S%.f") {
let mills = naive_datetime_to_timestamp(string, ts).map_err(DataFusionError::from)?;
return Ok(ScalarValue::TimestampMillisecond(Some(mills), None));
}

// without a timezone specifier as a local time, using ' ' as a separator
// Example: 2020-09-08 13:42:29.190855
if let Ok(ts) = NaiveDateTime::parse_from_str(string, "%Y-%m-%d %H:%M:%S%.f") {
let mills = naive_datetime_to_timestamp(string, ts).map_err(DataFusionError::from)?;
return Ok(ScalarValue::TimestampMillisecond(Some(mills), None));
}

Err(ArrowError::CastError(format!(
"Error parsing '{string}' as timestamp: local time representation is invalid"
)))
.map_err(DataFusionError::from)
}

/// Converts the naive datetime (which has no specific timezone) to a
/// nanosecond epoch timestamp relative to UTC.
/// copy from:https://github.com/apache/arrow-rs/blob/6a6e7f72331aa6589aa676577571ffed98d52394/arrow/src/compute/kernels/cast_utils.rs#L208
fn naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result<i64, ArrowError> {
let l = Local {};

match l.from_local_datetime(&datetime) {
LocalResult::None => Err(ArrowError::CastError(format!(
"Error parsing '{s}' as timestamp: local time representation is invalid"
))),
LocalResult::Single(local_datetime) => {
Ok(local_datetime.with_timezone(&Utc).timestamp_nanos() / 1_000_000)
}

LocalResult::Ambiguous(local_datetime, _) => {
Ok(local_datetime.with_timezone(&Utc).timestamp_nanos() / 1_000_000)
}
}
}

enum TimestampType {
Expand All @@ -315,12 +361,14 @@ mod tests {
use std::collections::HashMap;

use arrow::datatypes::TimeUnit;
use chrono::{NaiveDate, NaiveTime};
use datafusion::{
common::{DFField, DFSchema},
prelude::col,
};

use super::*;
use crate::logical_optimizer::type_conversion;

fn expr_test_schema() -> DFSchemaRef {
Arc::new(
Expand Down Expand Up @@ -445,7 +493,7 @@ mod tests {

#[test]
fn test_type_conversion_timestamp() {
let date_string = "2021-09-07 16:00:00".to_string();
let date_string = "2021-09-07T16:00:00Z".to_string();
let schema = expr_test_schema();
let mut rewriter = TypeRewriter {
schemas: vec![&schema],
Expand Down Expand Up @@ -498,7 +546,7 @@ mod tests {
);

// Timestamp c6 between "2021-09-07 16:00:00" and "2021-09-07 17:00:00"
let date_string2 = "2021-09-07 17:00:00".to_string();
let date_string2 = "2021-09-07T17:00:00Z".to_string();
let exp = Expr::Between(Between {
expr: Box::new(col("c6")),
negated: false,
Expand Down Expand Up @@ -530,4 +578,28 @@ mod tests {
})
);
}

#[test]
fn test_string_to_timestamp_ms_workaround() {
let date_string = [
"2021-09-07T16:00:00+08:00",
"2021-09-07 16:00:00+08:00",
"2021-09-07T16:00:00Z",
"2021-09-07 16:00:00Z",
];
for string in date_string {
let result = type_conversion::string_to_timestamp_ms_workaround(string);
assert!(result.is_err());
}

let date_string = "2021-09-07 16:00:00".to_string();
let d = NaiveDate::from_ymd_opt(2021, 9, 7).unwrap();
let t = NaiveTime::from_hms_milli_opt(16, 0, 0, 0).unwrap();
let dt = NaiveDateTime::new(d, t);
let expect = naive_datetime_to_timestamp(&date_string, dt).unwrap();
let result = type_conversion::string_to_timestamp_ms_workaround(&date_string);
if let Ok(ScalarValue::TimestampMillisecond(Some(mills), _)) = result {
assert_eq!(mills, expect)
}
}
}

0 comments on commit f79890b

Please sign in to comment.