Skip to content
Merged
9 changes: 8 additions & 1 deletion DEVELOPERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ DataFusion is written in Rust and it uses a standard rust toolkit:
- `cargo test` to test
- etc.

Testing setup:

- `git submodule init`
- `git submodule update`
- `export PARQUET_TEST_DATA=parquet_testing/`
- `export ARROW_TEST_DATA=testing/data/`

## How to add a new scalar function

Below is a checklist of what you need to do to add a new scalar function to DataFusion:
Expand All @@ -47,7 +54,7 @@ Below is a checklist of what you need to do to add a new scalar function to Data
- a new entry to `FromStr` with the name of the function as called by SQL
- a new line in `return_type` with the expected return type of the function, given an incoming type
- a new line in `signature` with the signature of the function (number and types of its arguments)
- a new line in `create_physical_expr` mapping the built-in to the implementation
- a new line in `create_physical_expr`/`create_physical_fun` mapping the built-in to the implementation
- tests to the function.
- In [tests/sql.rs](datafusion/tests/sql.rs), add a new test where the function is called through SQL against well known data and returns the expected result.
- In [src/logical_plan/expr](datafusion/src/logical_plan/expr.rs), add:
Expand Down
54 changes: 29 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ DataFusion also includes a simple command-line interactive SQL utility. See the
- [ ] Basic date functions
- [ ] Basic time functions
- [x] Basic timestamp functions
- [x] [to_timestamp](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp)
- [x] [to_timestamp_millis](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp_millis)
- [x] [to_timestamp_micros](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp_micros)
- [x] [to_timestamp_seconds](docs/user-guide/book/sql/datafusion-functions.html#to_timestamp_seconds)
- nested functions
- [x] Array of columns
- [x] Schema Queries
Expand Down Expand Up @@ -320,31 +324,31 @@ execution. The SQL types from
[sqlparser-rs](https://github.com/ballista-compute/sqlparser-rs/blob/main/src/ast/data_type.rs#L57)
are mapped to Arrow types according to the following table

| SQL Data Type | Arrow DataType |
| ------------- | ------------------------------- |
| `CHAR` | `Utf8` |
| `VARCHAR` | `Utf8` |
| `UUID` | _Not yet supported_ |
| `CLOB` | _Not yet supported_ |
| `BINARY` | _Not yet supported_ |
| `VARBINARY` | _Not yet supported_ |
| `DECIMAL` | `Float64` |
| `FLOAT` | `Float32` |
| `SMALLINT` | `Int16` |
| `INT` | `Int32` |
| `BIGINT` | `Int64` |
| `REAL` | `Float64` |
| `DOUBLE` | `Float64` |
| `BOOLEAN` | `Boolean` |
| `DATE` | `Date32` |
| `TIME` | `Time64(TimeUnit::Millisecond)` |
| `TIMESTAMP` | `Date64` |
| `INTERVAL` | _Not yet supported_ |
| `REGCLASS` | _Not yet supported_ |
| `TEXT` | _Not yet supported_ |
| `BYTEA` | _Not yet supported_ |
| `CUSTOM` | _Not yet supported_ |
| `ARRAY` | _Not yet supported_ |
| SQL Data Type | Arrow DataType |
| ------------- | --------------------------------- |
| `CHAR` | `Utf8` |
| `VARCHAR` | `Utf8` |
| `UUID` | _Not yet supported_ |
| `CLOB` | _Not yet supported_ |
| `BINARY` | _Not yet supported_ |
| `VARBINARY` | _Not yet supported_ |
| `DECIMAL` | `Float64` |
| `FLOAT` | `Float32` |
| `SMALLINT` | `Int16` |
| `INT` | `Int32` |
| `BIGINT` | `Int64` |
| `REAL` | `Float64` |
| `DOUBLE` | `Float64` |
| `BOOLEAN` | `Boolean` |
| `DATE` | `Date32` |
| `TIME` | `Time64(TimeUnit::Millisecond)` |
| `TIMESTAMP` | `Timestamp(TimeUnit::Nanosecond)` |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

| `INTERVAL` | _Not yet supported_ |
| `REGCLASS` | _Not yet supported_ |
| `TEXT` | _Not yet supported_ |
| `BYTEA` | _Not yet supported_ |
| `CUSTOM` | _Not yet supported_ |
| `ARRAY` | _Not yet supported_ |

# Architecture Overview

Expand Down
4 changes: 2 additions & 2 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ unicode_expressions = ["unicode-segmentation"]
[dependencies]
ahash = "0.7"
hashbrown = "0.11"
arrow = { version = "4.0", features = ["prettyprint"] }
parquet = { version = "4.0", features = ["arrow"] }
arrow = { version = "4.3", features = ["prettyprint"] }
parquet = { version = "4.3", features = ["arrow"] }
sqlparser = "0.9.0"
paste = "^1.0"
num_cpus = "1.13.0"
Expand Down
32 changes: 31 additions & 1 deletion datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use crate::{
};
use arrow::{
array::{Array, ArrayRef, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait},
datatypes::{ArrowPrimitiveType, DataType, TimestampNanosecondType},
datatypes::{
ArrowPrimitiveType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
},
};
use arrow::{
array::{
Expand Down Expand Up @@ -268,6 +271,33 @@ pub fn to_timestamp(args: &[ColumnarValue]) -> Result<ColumnarValue> {
)
}

/// to_timestamp_millis SQL function
pub fn to_timestamp_millis(args: &[ColumnarValue]) -> Result<ColumnarValue> {
handle::<TimestampMillisecondType, _, TimestampMillisecondType>(
args,
|s| string_to_timestamp_nanos(s).map(|n| n / 1_000_000),
"to_timestamp_millis",
)
}

/// to_timestamp_micros SQL function
pub fn to_timestamp_micros(args: &[ColumnarValue]) -> Result<ColumnarValue> {
handle::<TimestampMicrosecondType, _, TimestampMicrosecondType>(
args,
|s| string_to_timestamp_nanos(s).map(|n| n / 1_000),
"to_timestamp_micros",
)
}

/// to_timestamp_seconds SQL function
pub fn to_timestamp_seconds(args: &[ColumnarValue]) -> Result<ColumnarValue> {
handle::<TimestampSecondType, _, TimestampSecondType>(
args,
|s| string_to_timestamp_nanos(s).map(|n| n / 1_000_000_000),
"to_timestamp_seconds",
)
}

/// Create an implementation of `now()` that always returns the
/// specified timestamp.
///
Expand Down
19 changes: 19 additions & 0 deletions datafusion/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::{any::Any, sync::Arc};

use arrow::array::TimestampMillisecondArray;
use arrow::array::*;
use arrow::compute::kernels::arithmetic::{
add, divide, divide_scalar, multiply, subtract,
Expand Down Expand Up @@ -256,6 +257,15 @@ macro_rules! binary_array_op_scalar {
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
}
DataType::Timestamp(TimeUnit::Microsecond, None) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
}
DataType::Timestamp(TimeUnit::Millisecond, None) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
}
DataType::Timestamp(TimeUnit::Second, None) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampSecondArray)
}
DataType::Date32 => {
compute_op_scalar!($LEFT, $RIGHT, $OP, Date32Array)
}
Expand Down Expand Up @@ -288,6 +298,15 @@ macro_rules! binary_array_op {
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
}
DataType::Timestamp(TimeUnit::Microsecond, None) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
}
DataType::Timestamp(TimeUnit::Millisecond, None) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
}
DataType::Timestamp(TimeUnit::Second, None) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampSecondArray)
}
DataType::Date32 => {
compute_op!($LEFT, $RIGHT, $OP, Date32Array)
}
Expand Down
38 changes: 20 additions & 18 deletions datafusion/src/physical_plan/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,26 @@ impl PhysicalExpr for CastExpr {

fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let value = self.expr.evaluate(batch)?;
match value {
ColumnarValue::Array(array) => {
Ok(ColumnarValue::Array(kernels::cast::cast_with_options(
&array,
&self.cast_type,
&self.cast_options,
)?))
}
ColumnarValue::Scalar(scalar) => {
let scalar_array = scalar.to_array();
let cast_array = kernels::cast::cast_with_options(
&scalar_array,
&self.cast_type,
&self.cast_options,
)?;
let cast_scalar = ScalarValue::try_from_array(&cast_array, 0)?;
Ok(ColumnarValue::Scalar(cast_scalar))
}
cast_column(&value, &self.cast_type, &self.cast_options)
}
}

/// Internal cast function for casting ColumnarValue -> ColumnarValue for cast_type
pub fn cast_column(
value: &ColumnarValue,
cast_type: &DataType,
cast_options: &CastOptions,
) -> Result<ColumnarValue> {
match value {
ColumnarValue::Array(array) => Ok(ColumnarValue::Array(
kernels::cast::cast_with_options(array, cast_type, cast_options)?,
)),
ColumnarValue::Scalar(scalar) => {
let scalar_array = scalar.to_array();
let cast_array =
kernels::cast::cast_with_options(&scalar_array, cast_type, cast_options)?;
let cast_scalar = ScalarValue::try_from_array(&cast_array, 0)?;
Ok(ColumnarValue::Scalar(cast_scalar))
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ mod try_cast;
pub use average::{avg_return_type, Avg, AvgAccumulator};
pub use binary::{binary, binary_operator_data_type, BinaryExpr};
pub use case::{case, CaseExpr};
pub use cast::{cast, cast_with_options, CastExpr};
pub use cast::{
cast, cast_column, cast_with_options, CastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS,
};
pub use column::{col, Column};
pub use count::Count;
pub use in_list::{in_list, InListExpr};
Expand Down
6 changes: 1 addition & 5 deletions datafusion/src/physical_plan/expressions/nullif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ use super::ColumnarValue;
use crate::error::{DataFusionError, Result};
use crate::scalar::ScalarValue;
use arrow::array::Array;
use arrow::array::{
ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, StringArray, TimestampNanosecondArray,
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::array::*;
use arrow::compute::kernels::boolean::nullif;
use arrow::compute::kernels::comparison::{eq, eq_scalar, eq_utf8, eq_utf8_scalar};
use arrow::datatypes::{DataType, TimeUnit};
Expand Down
Loading