Skip to content

Commit 5900b4c

Browse files
velviaEvan Chanalamb
authored
to_timestamp_millis(), to_timestamp_micros(), to_timestamp_seconds() (#567)
* to_timestamp_millis(): support casting to Timestamp(Milliseconds, _) from Int64 * Add testing setup to instructions * to_timestamp_millis(): Convert timestamp strings to TimestampMillis * [functions] Add to_timestamp_micros() and to_timestamp_seconds() functions * Update datafusion/tests/sql.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * CR feedback and fix build * Add ability for to_timestamp_xxx() functions to cast from other Timestamp types * For consistency, let to_timestamp() also perform casts * Prettier / clippy * Add docs for to_timestamp() functions Co-authored-by: Evan Chan <evan@urbanlogiq.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 330c809 commit 5900b4c

File tree

13 files changed

+618
-68
lines changed

13 files changed

+618
-68
lines changed

DEVELOPERS.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ DataFusion is written in Rust and it uses a standard rust toolkit:
3333
- `cargo test` to test
3434
- etc.
3535

36+
Testing setup:
37+
38+
- `git submodule init`
39+
- `git submodule update`
40+
- `export PARQUET_TEST_DATA=parquet_testing/`
41+
- `export ARROW_TEST_DATA=testing/data/`
42+
3643
## How to add a new scalar function
3744

3845
Below is a checklist of what you need to do to add a new scalar function to DataFusion:
@@ -47,7 +54,7 @@ Below is a checklist of what you need to do to add a new scalar function to Data
4754
- a new entry to `FromStr` with the name of the function as called by SQL
4855
- a new line in `return_type` with the expected return type of the function, given an incoming type
4956
- a new line in `signature` with the signature of the function (number and types of its arguments)
50-
- a new line in `create_physical_expr` mapping the built-in to the implementation
57+
- a new line in `create_physical_expr`/`create_physical_fun` mapping the built-in to the implementation
5158
- tests to the function.
5259
- In [tests/sql.rs](datafusion/tests/sql.rs), add a new test where the function is called through SQL against well known data and returns the expected result.
5360
- In [src/logical_plan/expr](datafusion/src/logical_plan/expr.rs), add:

README.md

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,10 @@ DataFusion also includes a simple command-line interactive SQL utility. See the
197197
- [ ] Basic date functions
198198
- [ ] Basic time functions
199199
- [x] Basic timestamp functions
200+
- [x] [to_timestamp](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp)
201+
- [x] [to_timestamp_millis](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp_millis)
202+
- [x] [to_timestamp_micros](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp_micros)
203+
- [x] [to_timestamp_seconds](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp_seconds)
200204
- nested functions
201205
- [x] Array of columns
202206
- [x] Schema Queries
@@ -320,31 +324,31 @@ execution. The SQL types from
320324
[sqlparser-rs](https://github.com/ballista-compute/sqlparser-rs/blob/main/src/ast/data_type.rs#L57)
321325
are mapped to Arrow types according to the following table
322326

323-
| SQL Data Type | Arrow DataType |
324-
| ------------- | ------------------------------- |
325-
| `CHAR` | `Utf8` |
326-
| `VARCHAR` | `Utf8` |
327-
| `UUID` | _Not yet supported_ |
328-
| `CLOB` | _Not yet supported_ |
329-
| `BINARY` | _Not yet supported_ |
330-
| `VARBINARY` | _Not yet supported_ |
331-
| `DECIMAL` | `Float64` |
332-
| `FLOAT` | `Float32` |
333-
| `SMALLINT` | `Int16` |
334-
| `INT` | `Int32` |
335-
| `BIGINT` | `Int64` |
336-
| `REAL` | `Float64` |
337-
| `DOUBLE` | `Float64` |
338-
| `BOOLEAN` | `Boolean` |
339-
| `DATE` | `Date32` |
340-
| `TIME` | `Time64(TimeUnit::Millisecond)` |
341-
| `TIMESTAMP` | `Date64` |
342-
| `INTERVAL` | _Not yet supported_ |
343-
| `REGCLASS` | _Not yet supported_ |
344-
| `TEXT` | _Not yet supported_ |
345-
| `BYTEA` | _Not yet supported_ |
346-
| `CUSTOM` | _Not yet supported_ |
347-
| `ARRAY` | _Not yet supported_ |
327+
| SQL Data Type | Arrow DataType |
328+
| ------------- | --------------------------------- |
329+
| `CHAR` | `Utf8` |
330+
| `VARCHAR` | `Utf8` |
331+
| `UUID` | _Not yet supported_ |
332+
| `CLOB` | _Not yet supported_ |
333+
| `BINARY` | _Not yet supported_ |
334+
| `VARBINARY` | _Not yet supported_ |
335+
| `DECIMAL` | `Float64` |
336+
| `FLOAT` | `Float32` |
337+
| `SMALLINT` | `Int16` |
338+
| `INT` | `Int32` |
339+
| `BIGINT` | `Int64` |
340+
| `REAL` | `Float64` |
341+
| `DOUBLE` | `Float64` |
342+
| `BOOLEAN` | `Boolean` |
343+
| `DATE` | `Date32` |
344+
| `TIME` | `Time64(TimeUnit::Millisecond)` |
345+
| `TIMESTAMP` | `Timestamp(TimeUnit::Nanosecond)` |
346+
| `INTERVAL` | _Not yet supported_ |
347+
| `REGCLASS` | _Not yet supported_ |
348+
| `TEXT` | _Not yet supported_ |
349+
| `BYTEA` | _Not yet supported_ |
350+
| `CUSTOM` | _Not yet supported_ |
351+
| `ARRAY` | _Not yet supported_ |
348352

349353
# Architecture Overview
350354

datafusion/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ unicode_expressions = ["unicode-segmentation"]
4646
[dependencies]
4747
ahash = "0.7"
4848
hashbrown = "0.11"
49-
arrow = { version = "4.0", features = ["prettyprint"] }
50-
parquet = { version = "4.0", features = ["arrow"] }
49+
arrow = { version = "4.3", features = ["prettyprint"] }
50+
parquet = { version = "4.3", features = ["arrow"] }
5151
sqlparser = "0.9.0"
5252
paste = "^1.0"
5353
num_cpus = "1.13.0"

datafusion/src/physical_plan/datetime_expressions.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ use crate::{
2525
};
2626
use arrow::{
2727
array::{Array, ArrayRef, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait},
28-
datatypes::{ArrowPrimitiveType, DataType, TimestampNanosecondType},
28+
datatypes::{
29+
ArrowPrimitiveType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
30+
TimestampNanosecondType, TimestampSecondType,
31+
},
2932
};
3033
use arrow::{
3134
array::{
@@ -268,6 +271,33 @@ pub fn to_timestamp(args: &[ColumnarValue]) -> Result<ColumnarValue> {
268271
)
269272
}
270273

274+
/// to_timestamp_millis SQL function
275+
pub fn to_timestamp_millis(args: &[ColumnarValue]) -> Result<ColumnarValue> {
276+
handle::<TimestampMillisecondType, _, TimestampMillisecondType>(
277+
args,
278+
|s| string_to_timestamp_nanos(s).map(|n| n / 1_000_000),
279+
"to_timestamp_millis",
280+
)
281+
}
282+
283+
/// to_timestamp_micros SQL function
284+
pub fn to_timestamp_micros(args: &[ColumnarValue]) -> Result<ColumnarValue> {
285+
handle::<TimestampMicrosecondType, _, TimestampMicrosecondType>(
286+
args,
287+
|s| string_to_timestamp_nanos(s).map(|n| n / 1_000),
288+
"to_timestamp_micros",
289+
)
290+
}
291+
292+
/// to_timestamp_seconds SQL function
293+
pub fn to_timestamp_seconds(args: &[ColumnarValue]) -> Result<ColumnarValue> {
294+
handle::<TimestampSecondType, _, TimestampSecondType>(
295+
args,
296+
|s| string_to_timestamp_nanos(s).map(|n| n / 1_000_000_000),
297+
"to_timestamp_seconds",
298+
)
299+
}
300+
271301
/// Create an implementation of `now()` that always returns the
272302
/// specified timestamp.
273303
///

datafusion/src/physical_plan/expressions/binary.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::{any::Any, sync::Arc};
1919

20+
use arrow::array::TimestampMillisecondArray;
2021
use arrow::array::*;
2122
use arrow::compute::kernels::arithmetic::{
2223
add, divide, divide_scalar, multiply, subtract,
@@ -256,6 +257,15 @@ macro_rules! binary_array_op_scalar {
256257
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
257258
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
258259
}
260+
DataType::Timestamp(TimeUnit::Microsecond, None) => {
261+
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
262+
}
263+
DataType::Timestamp(TimeUnit::Millisecond, None) => {
264+
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
265+
}
266+
DataType::Timestamp(TimeUnit::Second, None) => {
267+
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampSecondArray)
268+
}
259269
DataType::Date32 => {
260270
compute_op_scalar!($LEFT, $RIGHT, $OP, Date32Array)
261271
}
@@ -288,6 +298,15 @@ macro_rules! binary_array_op {
288298
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
289299
compute_op!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
290300
}
301+
DataType::Timestamp(TimeUnit::Microsecond, None) => {
302+
compute_op!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
303+
}
304+
DataType::Timestamp(TimeUnit::Millisecond, None) => {
305+
compute_op!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
306+
}
307+
DataType::Timestamp(TimeUnit::Second, None) => {
308+
compute_op!($LEFT, $RIGHT, $OP, TimestampSecondArray)
309+
}
291310
DataType::Date32 => {
292311
compute_op!($LEFT, $RIGHT, $OP, Date32Array)
293312
}

datafusion/src/physical_plan/expressions/cast.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -91,24 +91,26 @@ impl PhysicalExpr for CastExpr {
9191

9292
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
9393
let value = self.expr.evaluate(batch)?;
94-
match value {
95-
ColumnarValue::Array(array) => {
96-
Ok(ColumnarValue::Array(kernels::cast::cast_with_options(
97-
&array,
98-
&self.cast_type,
99-
&self.cast_options,
100-
)?))
101-
}
102-
ColumnarValue::Scalar(scalar) => {
103-
let scalar_array = scalar.to_array();
104-
let cast_array = kernels::cast::cast_with_options(
105-
&scalar_array,
106-
&self.cast_type,
107-
&self.cast_options,
108-
)?;
109-
let cast_scalar = ScalarValue::try_from_array(&cast_array, 0)?;
110-
Ok(ColumnarValue::Scalar(cast_scalar))
111-
}
94+
cast_column(&value, &self.cast_type, &self.cast_options)
95+
}
96+
}
97+
98+
/// Internal cast function for casting ColumnarValue -> ColumnarValue for cast_type
99+
pub fn cast_column(
100+
value: &ColumnarValue,
101+
cast_type: &DataType,
102+
cast_options: &CastOptions,
103+
) -> Result<ColumnarValue> {
104+
match value {
105+
ColumnarValue::Array(array) => Ok(ColumnarValue::Array(
106+
kernels::cast::cast_with_options(array, cast_type, cast_options)?,
107+
)),
108+
ColumnarValue::Scalar(scalar) => {
109+
let scalar_array = scalar.to_array();
110+
let cast_array =
111+
kernels::cast::cast_with_options(&scalar_array, cast_type, cast_options)?;
112+
let cast_scalar = ScalarValue::try_from_array(&cast_array, 0)?;
113+
Ok(ColumnarValue::Scalar(cast_scalar))
112114
}
113115
}
114116
}

datafusion/src/physical_plan/expressions/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ mod try_cast;
4949
pub use average::{avg_return_type, Avg, AvgAccumulator};
5050
pub use binary::{binary, binary_operator_data_type, BinaryExpr};
5151
pub use case::{case, CaseExpr};
52-
pub use cast::{cast, cast_with_options, CastExpr};
52+
pub use cast::{
53+
cast, cast_column, cast_with_options, CastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS,
54+
};
5355
pub use column::{col, Column};
5456
pub use count::Count;
5557
pub use in_list::{in_list, InListExpr};

datafusion/src/physical_plan/expressions/nullif.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,7 @@ use super::ColumnarValue;
2121
use crate::error::{DataFusionError, Result};
2222
use crate::scalar::ScalarValue;
2323
use arrow::array::Array;
24-
use arrow::array::{
25-
ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
26-
Int16Array, Int32Array, Int64Array, Int8Array, StringArray, TimestampNanosecondArray,
27-
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
28-
};
24+
use arrow::array::*;
2925
use arrow::compute::kernels::boolean::nullif;
3026
use arrow::compute::kernels::comparison::{eq, eq_scalar, eq_utf8, eq_utf8_scalar};
3127
use arrow::datatypes::{DataType, TimeUnit};

0 commit comments

Comments
 (0)