Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ datafusion-comet-spark-expr = { path = "spark-expr", version = "0.5.0" }
datafusion-comet-proto = { path = "proto", version = "0.5.0" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
futures = "0.3.28"
num = "0.4"
rand = "0.8"
regex = "1.9.6"
Expand Down
7 changes: 1 addition & 6 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ arrow-data = { workspace = true }
arrow-schema = { workspace = true }
parquet = { workspace = true, default-features = false, features = ["experimental"] }
half = { version = "2.4.1", default-features = false }
futures = "0.3.28"
futures = { workspace = true }
mimalloc = { version = "*", default-features = false, optional = true }
tokio = { version = "1", features = ["rt-multi-thread"] }
async-trait = "0.1"
Expand Down Expand Up @@ -88,7 +88,6 @@ hex = "0.4.3"

[features]
default = []
nightly = []

[lib]
name = "comet"
Expand Down Expand Up @@ -123,10 +122,6 @@ harness = false
name = "filter"
harness = false

[[bench]]
name = "aggregate"
harness = false

[[bench]]
name = "bloom_filter_agg"
harness = false
6 changes: 2 additions & 4 deletions native/core/src/common/bit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

use std::{cmp::min, mem::size_of};

use arrow::buffer::Buffer;

use crate::{
errors::CometResult as Result,
likely,
parquet::{data_type::AsBytes, util::bit_packing::unpack32},
unlikely,
};
use arrow::buffer::Buffer;
use datafusion_comet_spark_expr::utils::{likely, unlikely};

#[inline]
pub fn from_ne_slice<T: FromBytes>(bs: &[u8]) -> T {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use arrow::{
datatypes::{Decimal128Type, DecimalType},
record_batch::RecordBatch,
};
use arrow_data::decimal::{MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION};
use arrow_schema::{DataType, Schema, DECIMAL128_MAX_PRECISION};
use arrow_schema::{DataType, Schema};
use datafusion::logical_expr::ColumnarValue;
use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
use datafusion_common::{DataFusionError, ScalarValue};
Expand Down Expand Up @@ -172,15 +171,3 @@ impl PhysicalExpr for CheckOverflow {
self.hash(&mut s);
}
}

/// Adapted from arrow-rs `validate_decimal_precision` but returns bool
/// instead of Err to avoid the cost of formatting the error strings and is
/// optimized to remove a memcpy that exists in the original function
/// we can remove this code once we upgrade to a version of arrow-rs that
/// includes https://github.com/apache/arrow-rs/pull/6419
#[inline]
pub fn is_valid_decimal_precision(value: i128, precision: u8) -> bool {
precision <= DECIMAL128_MAX_PRECISION
&& value >= MIN_DECIMAL_FOR_EACH_PRECISION[precision as usize - 1]
&& value <= MAX_DECIMAL_FOR_EACH_PRECISION[precision as usize - 1]
}
11 changes: 0 additions & 11 deletions native/core/src/execution/datafusion/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,15 @@

//! Native DataFusion expressions

pub mod bitwise_not;
pub mod checkoverflow;
mod normalize_nan;
pub use normalize_nan::NormalizeNaNAndZero;

use crate::errors::CometError;
pub mod avg;
pub mod avg_decimal;
pub mod bloom_filter_agg;
pub mod bloom_filter_might_contain;
pub mod comet_scalar_funcs;
pub mod correlation;
pub mod covariance;
pub mod negative;
pub mod stddev;
pub mod strings;
pub mod subquery;
pub mod sum_decimal;
pub mod unbound;
pub mod variance;

pub use datafusion_comet_spark_expr::{EvalMode, SparkError};

Expand Down
18 changes: 5 additions & 13 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,19 @@
//! Converts Spark physical plan to DataFusion physical plan

use super::expressions::EvalMode;
use crate::execution::datafusion::expressions::comet_scalar_funcs::create_comet_physical_fun;
use crate::execution::operators::{CopyMode, FilterExec};
use crate::{
errors::ExpressionError,
execution::{
datafusion::{
expressions::{
avg::Avg,
avg_decimal::AvgDecimal,
bitwise_not::BitwiseNotExpr,
bloom_filter_agg::BloomFilterAgg,
bloom_filter_might_contain::BloomFilterMightContain,
checkoverflow::CheckOverflow,
correlation::Correlation,
covariance::Covariance,
negative,
stddev::Stddev,
strings::{Contains, EndsWith, Like, StartsWith, StringSpaceExpr, SubstringExpr},
subquery::Subquery,
sum_decimal::SumDecimal,
unbound::UnboundColumn,
variance::Variance,
NormalizeNaNAndZero,
},
operators::expand::CometExpandExec,
shuffle_writer::ShuffleWriterExec,
Expand Down Expand Up @@ -82,6 +72,7 @@ use datafusion::{
},
prelude::SessionContext,
};
use datafusion_comet_spark_expr::create_comet_physical_fun;
use datafusion_functions_nested::concat::ArrayAppend;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};

Expand All @@ -99,9 +90,10 @@ use datafusion_comet_proto::{
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
};
use datafusion_comet_spark_expr::{
ArrayInsert, Cast, CreateNamedStruct, DateTruncExpr, GetArrayStructFields, GetStructField,
HourExpr, IfExpr, ListExtract, MinuteExpr, RLike, SecondExpr, SparkCastOptions,
TimestampTruncExpr, ToJson,
ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, Correlation, Covariance, CreateNamedStruct,
DateTruncExpr, GetArrayStructFields, GetStructField, HourExpr, IfExpr, ListExtract, MinuteExpr,
NormalizeNaNAndZero, RLike, SecondExpr, SparkCastOptions, Stddev, SumDecimal,
TimestampTruncExpr, ToJson, Variance,
};
use datafusion_common::scalar::ScalarStructBuilder;
use datafusion_common::{
Expand Down
27 changes: 0 additions & 27 deletions native/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,30 +104,3 @@ fn default_logger_config() -> CometResult<Config> {
.build(root)
.map_err(|err| CometError::Config(err.to_string()))
}

// These are borrowed from hashbrown crate:
// https://github.com/rust-lang/hashbrown/blob/master/src/raw/mod.rs

// On stable we can use #[cold] to get a equivalent effect: this attributes
// suggests that the function is unlikely to be called
#[cfg(not(feature = "nightly"))]
#[inline]
#[cold]
fn cold() {}

#[cfg(not(feature = "nightly"))]
#[inline]
fn likely(b: bool) -> bool {
if !b {
cold();
}
b
}
#[cfg(not(feature = "nightly"))]
#[inline]
fn unlikely(b: bool) -> bool {
if b {
cold();
}
b
}
7 changes: 3 additions & 4 deletions native/core/src/parquet/read/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

use std::mem;

use arrow::buffer::Buffer;
use parquet::schema::types::ColumnDescPtr;

use super::values::Decoder;
use crate::{
common::bit::{self, read_u32, BitReader},
parquet::ParquetMutableVector,
unlikely,
};
use arrow::buffer::Buffer;
use datafusion_comet_spark_expr::utils::unlikely;
use parquet::schema::types::ColumnDescPtr;

const INITIAL_BUF_LEN: usize = 16;

Expand Down
2 changes: 1 addition & 1 deletion native/core/src/parquet/read/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ use crate::write_val_or_null;
use crate::{
common::bit::{self, BitReader},
parquet::{data_type::*, ParquetMutableVector},
unlikely,
};
use arrow::datatypes::DataType as ArrowDataType;
use datafusion_comet_spark_expr::utils::unlikely;

pub fn get_decoder<T: DataType>(
value_data: Buffer,
Expand Down
8 changes: 8 additions & 0 deletions native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ edition = { workspace = true }
[dependencies]
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
datafusion = { workspace = true }
Expand All @@ -39,12 +40,14 @@ chrono-tz = { workspace = true }
num = { workspace = true }
regex = { workspace = true }
thiserror = { workspace = true }
futures = { workspace = true }
twox-hash = "2.0.0"

[dev-dependencies]
arrow-data = {workspace = true}
criterion = "0.5.1"
rand = { workspace = true}
tokio = { version = "1", features = ["rt-multi-thread"] }


[lib]
Expand All @@ -66,3 +69,8 @@ harness = false
[[bench]]
name = "decimal_div"
harness = false

[[bench]]
name = "aggregate"
harness = false

Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::builder::{Decimal128Builder, StringBuilder};
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::SchemaRef;
use comet::execution::datafusion::expressions::avg_decimal::AvgDecimal;
use comet::execution::datafusion::expressions::sum_decimal::SumDecimal;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion::execution::TaskContext;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_execution::TaskContext;
use datafusion_comet_spark_expr::AvgDecimal;
use datafusion_comet_spark_expr::SumDecimal;
use datafusion_expr::AggregateUDF;
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
use datafusion_physical_expr::expressions::Column;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_common::{not_impl_err, Result, ScalarValue};
use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr};
use std::{any::Any, sync::Arc};

use crate::execution::datafusion::expressions::checkoverflow::is_valid_decimal_precision;
use crate::utils::is_valid_decimal_precision;
use arrow_array::ArrowNativeTypeOp;
use arrow_data::decimal::{MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION};
use datafusion::logical_expr::Volatility::Immutable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use arrow::{
};
use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
use datafusion::{error::DataFusionError, logical_expr::ColumnarValue};
use datafusion_common::{Result, ScalarValue};
use datafusion_common::Result;
use datafusion_physical_expr::PhysicalExpr;

macro_rules! compute_op {
Expand Down Expand Up @@ -135,22 +135,6 @@ pub fn bitwise_not(arg: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>
Ok(Arc::new(BitwiseNotExpr::new(arg)))
}

fn scalar_bitwise_not(scalar: ScalarValue) -> Result<ScalarValue> {
match scalar {
ScalarValue::Int8(None)
| ScalarValue::Int16(None)
| ScalarValue::Int32(None)
| ScalarValue::Int64(None) => Ok(scalar),
ScalarValue::Int8(Some(v)) => Ok(ScalarValue::Int8(Some(!v))),
ScalarValue::Int16(Some(v)) => Ok(ScalarValue::Int16(Some(!v))),
ScalarValue::Int32(Some(v)) => Ok(ScalarValue::Int32(Some(!v))),
ScalarValue::Int64(Some(v)) => Ok(ScalarValue::Int64(Some(!v))),
value => Err(DataFusionError::Internal(format!(
"Can not run ! on scalar value {value:?}"
))),
}
}

#[cfg(test)]
mod tests {
use arrow::datatypes::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use arrow_schema::DataType;
use datafusion_comet_spark_expr::scalar_funcs::hash_expressions::{
use crate::scalar_funcs::hash_expressions::{
spark_sha224, spark_sha256, spark_sha384, spark_sha512,
};
use datafusion_comet_spark_expr::scalar_funcs::{
use crate::scalar_funcs::{
spark_ceil, spark_date_add, spark_date_sub, spark_decimal_div, spark_floor, spark_hex,
spark_isnan, spark_make_decimal, spark_murmur3_hash, spark_read_side_padding, spark_round,
spark_unhex, spark_unscaled_value, spark_xxhash64, SparkChrFunc,
};
use arrow_schema::DataType;
use datafusion_common::{DataFusionError, Result as DataFusionResult};
use datafusion_expr::registry::FunctionRegistry;
use datafusion_expr::{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ use arrow::compute::{and, filter, is_not_null};

use std::{any::Any, sync::Arc};

use crate::execution::datafusion::expressions::{
covariance::CovarianceAccumulator, stddev::StddevAccumulator,
};
use crate::covariance::CovarianceAccumulator;
use crate::stddev::StddevAccumulator;
use arrow::{
array::ArrayRef,
datatypes::{DataType, Field},
Expand Down
20 changes: 20 additions & 0 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,38 @@ mod cast;
mod error;
mod if_expr;

mod avg;
pub use avg::Avg;
mod bitwise_not;
pub use bitwise_not::{bitwise_not, BitwiseNotExpr};
mod avg_decimal;
pub use avg_decimal::AvgDecimal;
mod correlation;
pub use correlation::Correlation;
mod covariance;
pub use covariance::Covariance;
mod kernels;
mod list;
mod regexp;
pub mod scalar_funcs;
pub mod spark_hash;
mod stddev;
pub use stddev::Stddev;
mod structs;
mod sum_decimal;
pub use sum_decimal::SumDecimal;
mod normalize_nan;
mod temporal;
pub mod timezone;
mod to_json;
pub mod utils;
pub use normalize_nan::NormalizeNaNAndZero;

mod variance;
pub use variance::Variance;
mod comet_scalar_funcs;
pub use cast::{spark_cast, Cast, SparkCastOptions};
pub use comet_scalar_funcs::create_comet_physical_fun;
pub use error::{SparkError, SparkResult};
pub use if_expr::IfExpr;
pub use list::{ArrayInsert, GetArrayStructFields, ListExtract};
Expand Down
Loading
Loading