|
17 | 17 |
|
18 | 18 | use std::sync::Arc; |
19 | 19 |
|
| 20 | +use arrow::array::timezone::Tz; |
20 | 21 | use arrow::array::{ |
21 | 22 | Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray, |
22 | 23 | StringArrayType, StringViewArray, |
23 | 24 | }; |
24 | 25 | use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; |
25 | | -use arrow::datatypes::DataType; |
| 26 | +use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second}; |
| 27 | +use arrow::datatypes::{ArrowTimestampType, DataType}; |
26 | 28 | use chrono::format::{parse, Parsed, StrftimeItems}; |
27 | 29 | use chrono::LocalResult::Single; |
28 | | -use chrono::{DateTime, TimeZone, Utc}; |
| 30 | +use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc}; |
| 31 | +use std::ops::Add; |
29 | 32 |
|
30 | 33 | use datafusion_common::cast::as_generic_string_array; |
31 | 34 | use datafusion_common::{ |
32 | | - exec_datafusion_err, exec_err, unwrap_or_internal_err, DataFusionError, Result, |
33 | | - ScalarType, ScalarValue, |
| 35 | + exec_datafusion_err, exec_err, internal_datafusion_err, unwrap_or_internal_err, |
| 36 | + DataFusionError, Result, ScalarType, ScalarValue, |
34 | 37 | }; |
35 | 38 | use datafusion_expr::ColumnarValue; |
36 | 39 |
|
37 | 40 | /// Error message if nanosecond conversion request beyond supported interval |
38 | 41 | const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804"; |
39 | 42 |
|
| 43 | +/// Adjusts a timestamp to local time by applying the timezone offset. |
| 44 | +pub fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> { |
| 45 | + fn convert_timestamp<F>(ts: i64, converter: F) -> Result<DateTime<Utc>> |
| 46 | + where |
| 47 | + F: Fn(i64) -> MappedLocalTime<DateTime<Utc>>, |
| 48 | + { |
| 49 | + match converter(ts) { |
| 50 | + MappedLocalTime::Ambiguous(earliest, latest) => exec_err!( |
| 51 | + "Ambiguous timestamp. Do you mean {:?} or {:?}", |
| 52 | + earliest, |
| 53 | + latest |
| 54 | + ), |
| 55 | + MappedLocalTime::None => exec_err!( |
| 56 | + "The local time does not exist because there is a gap in the local time." |
| 57 | + ), |
| 58 | + MappedLocalTime::Single(date_time) => Ok(date_time), |
| 59 | + } |
| 60 | + } |
| 61 | + |
| 62 | + let date_time = match T::UNIT { |
| 63 | + Nanosecond => Utc.timestamp_nanos(ts), |
| 64 | + Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?, |
| 65 | + Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?, |
| 66 | + Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?, |
| 67 | + }; |
| 68 | + |
| 69 | + let offset_seconds: i64 = tz |
| 70 | + .offset_from_utc_datetime(&date_time.naive_utc()) |
| 71 | + .fix() |
| 72 | + .local_minus_utc() as i64; |
| 73 | + |
| 74 | + let adjusted_date_time = date_time.add( |
| 75 | + TimeDelta::try_seconds(offset_seconds) |
| 76 | + .ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?, |
| 77 | + ); |
| 78 | + |
| 79 | + // convert back to i64 |
| 80 | + match T::UNIT { |
| 81 | + Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(|| { |
| 82 | + internal_datafusion_err!( |
| 83 | + "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807" |
| 84 | + ) |
| 85 | + }), |
| 86 | + Microsecond => Ok(adjusted_date_time.timestamp_micros()), |
| 87 | + Millisecond => Ok(adjusted_date_time.timestamp_millis()), |
| 88 | + Second => Ok(adjusted_date_time.timestamp()), |
| 89 | + } |
| 90 | +} |
| 91 | + |
40 | 92 | /// Calls string_to_timestamp_nanos and converts the error type |
41 | 93 | pub(crate) fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> { |
42 | 94 | string_to_timestamp_nanos(s).map_err(|e| e.into()) |
|
0 commit comments