-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Support timestamps and steps of less than a day for range/generate_series #12400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e9fc1be
2596358
ab2a913
b2004bd
8be72ab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,19 +18,31 @@ | |
//! [`ScalarUDFImpl`] definitions for range and gen_series functions. | ||
|
||
use crate::utils::make_scalar_function; | ||
use arrow::array::{Array, ArrayRef, Date32Builder, Int64Array, ListArray, ListBuilder}; | ||
use arrow::array::{Array, ArrayRef, Int64Array, ListArray, ListBuilder}; | ||
use arrow::datatypes::{DataType, Field}; | ||
use arrow_array::types::{Date32Type, IntervalMonthDayNanoType}; | ||
use arrow_array::NullArray; | ||
use arrow_array::builder::{Date32Builder, TimestampNanosecondBuilder}; | ||
use arrow_array::temporal_conversions::as_datetime_with_timezone; | ||
use arrow_array::timezone::Tz; | ||
use arrow_array::types::{ | ||
Date32Type, IntervalMonthDayNanoType, TimestampNanosecondType as TSNT, | ||
}; | ||
use arrow_array::{NullArray, TimestampNanosecondArray}; | ||
use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer}; | ||
use arrow_schema::DataType::*; | ||
use arrow_schema::IntervalUnit::MonthDayNano; | ||
use datafusion_common::cast::{as_date32_array, as_int64_array, as_interval_mdn_array}; | ||
use datafusion_common::{exec_err, not_impl_datafusion_err, Result}; | ||
use arrow_schema::TimeUnit::Nanosecond; | ||
use datafusion_common::cast::{ | ||
as_date32_array, as_int64_array, as_interval_mdn_array, as_timestamp_nanosecond_array, | ||
}; | ||
use datafusion_common::{ | ||
exec_datafusion_err, exec_err, internal_err, not_impl_datafusion_err, Result, | ||
}; | ||
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; | ||
use itertools::Itertools; | ||
use std::any::Any; | ||
use std::cmp::Ordering; | ||
use std::iter::from_fn; | ||
use std::str::FromStr; | ||
use std::sync::Arc; | ||
|
||
make_udf_expr_and_func!( | ||
|
@@ -78,7 +90,7 @@ impl ScalarUDFImpl for Range { | |
UInt16 => Ok(Int64), | ||
UInt32 => Ok(Int64), | ||
UInt64 => Ok(Int64), | ||
Timestamp(_, _) => Ok(Date32), | ||
Timestamp(_, tz) => Ok(Timestamp(Nanosecond, tz.clone())), | ||
Date32 => Ok(Date32), | ||
Date64 => Ok(Date32), | ||
Utf8 => Ok(Date32), | ||
|
@@ -109,8 +121,11 @@ impl ScalarUDFImpl for Range { | |
match args[0].data_type() { | ||
Int64 => make_scalar_function(|args| gen_range_inner(args, false))(args), | ||
Date32 => make_scalar_function(|args| gen_range_date(args, false))(args), | ||
_ => { | ||
exec_err!("unsupported type for range") | ||
Timestamp(_, _) => { | ||
make_scalar_function(|args| gen_range_timestamp(args, false))(args) | ||
} | ||
dt => { | ||
exec_err!("unsupported type for RANGE. Expected Int64, Date32 or Timestamp, got: {dt}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ❤️ for a much better error |
||
} | ||
} | ||
} | ||
|
@@ -152,8 +167,8 @@ impl ScalarUDFImpl for GenSeries { | |
&self.signature | ||
} | ||
|
||
fn coerce_types(&self, _arg_types: &[DataType]) -> Result<Vec<DataType>> { | ||
_arg_types | ||
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> { | ||
arg_types | ||
.iter() | ||
.map(|arg_type| match arg_type { | ||
Null => Ok(Null), | ||
|
@@ -165,7 +180,7 @@ impl ScalarUDFImpl for GenSeries { | |
UInt16 => Ok(Int64), | ||
UInt32 => Ok(Int64), | ||
UInt64 => Ok(Int64), | ||
Timestamp(_, _) => Ok(Date32), | ||
Timestamp(_, tz) => Ok(Timestamp(Nanosecond, tz.clone())), | ||
Date32 => Ok(Date32), | ||
Date64 => Ok(Date32), | ||
Utf8 => Ok(Date32), | ||
|
@@ -196,9 +211,12 @@ impl ScalarUDFImpl for GenSeries { | |
match args[0].data_type() { | ||
Int64 => make_scalar_function(|args| gen_range_inner(args, true))(args), | ||
Date32 => make_scalar_function(|args| gen_range_date(args, true))(args), | ||
Timestamp(_, _) => { | ||
make_scalar_function(|args| gen_range_timestamp(args, true))(args) | ||
} | ||
dt => { | ||
exec_err!( | ||
"unsupported type for gen_series. Expected Int64 or Date32, got: {}", | ||
"unsupported type for GENERATE_SERIES. Expected Int64, Date32 or Timestamp, got: {}", | ||
dt | ||
) | ||
} | ||
|
@@ -334,7 +352,7 @@ fn gen_range_iter( | |
} | ||
} | ||
|
||
fn gen_range_date(args: &[ArrayRef], include_upper: bool) -> Result<ArrayRef> { | ||
fn gen_range_date(args: &[ArrayRef], include_upper_bound: bool) -> Result<ArrayRef> { | ||
if args.len() != 3 { | ||
return exec_err!("arguments length does not match"); | ||
} | ||
|
@@ -372,7 +390,7 @@ fn gen_range_date(args: &[ArrayRef], include_upper: bool) -> Result<ArrayRef> { | |
} | ||
|
||
let neg = months < 0 || days < 0; | ||
if !include_upper { | ||
if !include_upper_bound { | ||
stop = Date32Type::subtract_month_day_nano(stop, step); | ||
} | ||
let mut new_date = start; | ||
|
@@ -394,3 +412,136 @@ fn gen_range_date(args: &[ArrayRef], include_upper: bool) -> Result<ArrayRef> { | |
|
||
Ok(arr) | ||
} | ||
|
||
fn gen_range_timestamp(args: &[ArrayRef], include_upper_bound: bool) -> Result<ArrayRef> { | ||
if args.len() != 3 { | ||
return exec_err!( | ||
"Arguments length must be 3 for {}", | ||
if include_upper_bound { | ||
"GENERATE_SERIES" | ||
} else { | ||
"RANGE" | ||
} | ||
); | ||
} | ||
|
||
// coerce_types fn should coerce all types to Timestamp(Nanosecond, tz) | ||
let (start_arr, start_tz_opt) = cast_timestamp_arg(&args[0], include_upper_bound)?; | ||
let (stop_arr, stop_tz_opt) = cast_timestamp_arg(&args[1], include_upper_bound)?; | ||
let step_arr = as_interval_mdn_array(&args[2])?; | ||
let start_tz = parse_tz(start_tz_opt)?; | ||
let stop_tz = parse_tz(stop_tz_opt)?; | ||
|
||
// values are timestamps | ||
let values_builder = start_tz_opt | ||
.clone() | ||
.map_or_else(TimestampNanosecondBuilder::new, |start_tz_str| { | ||
TimestampNanosecondBuilder::new().with_timezone(start_tz_str) | ||
}); | ||
let mut list_builder = ListBuilder::new(values_builder); | ||
|
||
for idx in 0..start_arr.len() { | ||
if start_arr.is_null(idx) || stop_arr.is_null(idx) || step_arr.is_null(idx) { | ||
list_builder.append_null(); | ||
continue; | ||
} | ||
|
||
let start = start_arr.value(idx); | ||
let stop = stop_arr.value(idx); | ||
let step = step_arr.value(idx); | ||
|
||
let (months, days, ns) = IntervalMonthDayNanoType::to_parts(step); | ||
if months == 0 && days == 0 && ns == 0 { | ||
return exec_err!( | ||
"Interval argument to {} must not be 0", | ||
if include_upper_bound { | ||
"GENERATE_SERIES" | ||
} else { | ||
"RANGE" | ||
} | ||
); | ||
} | ||
|
||
let neg = TSNT::add_month_day_nano(start, step, start_tz) | ||
.ok_or(exec_datafusion_err!( | ||
"Cannot generate timestamp range where start + step overflows" | ||
))? | ||
.cmp(&start) | ||
== Ordering::Less; | ||
|
||
let stop_dt = as_datetime_with_timezone::<TSNT>(stop, stop_tz).ok_or( | ||
exec_datafusion_err!( | ||
"Cannot generate timestamp for stop: {}: {:?}", | ||
stop, | ||
stop_tz | ||
), | ||
)?; | ||
|
||
let mut current = start; | ||
let mut current_dt = as_datetime_with_timezone::<TSNT>(current, start_tz).ok_or( | ||
exec_datafusion_err!( | ||
"Cannot generate timestamp for start: {}: {:?}", | ||
Omega359 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
current, | ||
start_tz | ||
), | ||
)?; | ||
|
||
let values = from_fn(|| { | ||
if (include_upper_bound | ||
&& ((neg && current_dt < stop_dt) || (!neg && current_dt > stop_dt))) | ||
|| (!include_upper_bound | ||
&& ((neg && current_dt <= stop_dt) | ||
|| (!neg && current_dt >= stop_dt))) | ||
{ | ||
return None; | ||
} | ||
|
||
let prev_current = current; | ||
|
||
if let Some(ts) = TSNT::add_month_day_nano(current, step, start_tz) { | ||
current = ts; | ||
current_dt = as_datetime_with_timezone::<TSNT>(current, start_tz)?; | ||
|
||
Some(Some(prev_current)) | ||
} else { | ||
// we failed to parse the timestamp here so terminate the series | ||
None | ||
} | ||
}); | ||
|
||
list_builder.append_value(values); | ||
} | ||
|
||
let arr = Arc::new(list_builder.finish()); | ||
|
||
Ok(arr) | ||
} | ||
|
||
fn cast_timestamp_arg( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: You could potentially use And then call I personally found the use of the word There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My programming roots are showing through :) I can take a look at that |
||
arg: &ArrayRef, | ||
include_upper: bool, | ||
) -> Result<(&TimestampNanosecondArray, &Option<Arc<str>>)> { | ||
match arg.data_type() { | ||
Timestamp(Nanosecond, tz_opt) => { | ||
Ok((as_timestamp_nanosecond_array(arg)?, tz_opt)) | ||
} | ||
_ => { | ||
internal_err!( | ||
"Unexpected argument type for {} : {}", | ||
if include_upper { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this error block is third time showing up, perhaps we can factor it out |
||
"GENERATE_SERIES" | ||
} else { | ||
"RANGE" | ||
}, | ||
arg.data_type() | ||
) | ||
} | ||
} | ||
} | ||
|
||
fn parse_tz(tz: &Option<Arc<str>>) -> Result<Tz> { | ||
let tz = tz.as_ref().map_or_else(|| "+00", |s| s); | ||
|
||
Tz::from_str(tz) | ||
.map_err(|op| exec_datafusion_err!("failed to parse timezone {tz}: {:?}", op)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can probably factor our this block to avoid make a modifications twice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is so much duplication in these UDF's. I think it's best to push the cleanup of that to a separate PR and keep this one as small as possible.