Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6a8a3a4
Bump DF to latest main commit to test 51.0.0, bump Arrow-rs, cargo up…
mbutrovich Nov 7, 2025
268e31b
Fix "sql" feature dependency for native tests.
mbutrovich Nov 7, 2025
57da62d
Fix float normalize_nan bug?
mbutrovich Nov 7, 2025
eb95400
Remove unused imports.
mbutrovich Nov 7, 2025
5399550
Switch to branch-51, run cargo update.
mbutrovich Nov 11, 2025
c1d8a06
Bump df51.
mbutrovich Nov 17, 2025
d63f5cf
Merge branch 'main' into df51
mbutrovich Nov 17, 2025
ed0c8df
Merge branch 'main' into df51
mbutrovich Nov 19, 2025
69bd15d
Move to crate of df51.
mbutrovich Nov 19, 2025
736ffa9
Merge branch 'main' into df51
mbutrovich Dec 3, 2025
8e10de0
fix prost
mbutrovich Dec 9, 2025
eaa488d
Merge branch 'main' into df51
mbutrovich Dec 9, 2025
4bcba5e
update
mbutrovich Dec 9, 2025
4cbbc3d
Switch to DF51 iceberg-rust branch.
mbutrovich Dec 9, 2025
4c6cbb1
Skip serializing NULL partition values.
mbutrovich Dec 9, 2025
dead41a
fix schema selection
mbutrovich Dec 9, 2025
95bc89c
Fix format.
mbutrovich Dec 9, 2025
304c169
Fix format.
mbutrovich Dec 9, 2025
bbc7501
switch back to main iceberg-rust now that df51 is in there
mbutrovich Dec 10, 2025
05dc2bb
cargo update
mbutrovich Dec 10, 2025
b8576c1
Fix decimal128 partition values.
mbutrovich Dec 10, 2025
1b45c81
Merge branch 'main' into df51
mbutrovich Dec 10, 2025
cabd0c1
go back to iceberg-rust main branch
mbutrovich Dec 10, 2025
9e0bedb
Forgot to run cargo update.
mbutrovich Dec 10, 2025
d6b493a
test null identity-partitioned fix
mbutrovich Dec 10, 2025
961957f
back to main iceberg-rust
mbutrovich Dec 11, 2025
de5ed17
Merge branch 'main' into df51
mbutrovich Dec 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
569 changes: 270 additions & 299 deletions native/Cargo.lock

Large diffs are not rendered by default.

15 changes: 7 additions & 8 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ license = "Apache-2.0"
edition = "2021"

# Comet uses the same minimum Rust version as DataFusion
rust-version = "1.86"
rust-version = "1.88"

[workspace.dependencies]
arrow = { version = "56.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow = { version = "57.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }
async-trait = { version = "0.1" }
bytes = { version = "1.10.0" }
parquet = { version = "56.2.0", default-features = false, features = ["experimental"] }
datafusion = { version = "50.3.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { version = "50.3.0" }
datafusion-spark = { version = "50.3.0" }
parquet = { version = "57.0.0", default-features = false, features = ["experimental"] }
datafusion = { version = "51.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { version = "51.0.0" }
datafusion-spark = { version = "51.0.0" }
datafusion-comet-spark-expr = { path = "spark-expr" }
datafusion-comet-proto = { path = "proto" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
Expand All @@ -54,8 +54,7 @@ object_store = { version = "0.12.3", features = ["gcp", "azure", "aws", "http"]
url = "2.2"
aws-config = "1.8.10"
aws-credential-types = "1.2.9"
iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "a667539" }
iceberg-datafusion = { git = "https://github.com/apache/iceberg-rust", rev = "a667539" }
iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "16906c1" }

[profile.release]
debug = true
Expand Down
4 changes: 2 additions & 2 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ bytes = { workspace = true }
tempfile = "3.8.0"
itertools = "0.14.0"
paste = "1.0.14"
datafusion = { workspace = true, features = ["parquet_encryption"] }
datafusion = { workspace = true, features = ["parquet_encryption", "sql"] }
datafusion-datasource = { workspace = true }
datafusion-spark = { workspace = true }
once_cell = "1.18.0"
Expand Down Expand Up @@ -95,7 +95,7 @@ jni = { version = "0.21", features = ["invocation"] }
lazy_static = "1.4"
assertables = "9"
hex = "0.4.3"
datafusion-functions-nested = { version = "50.3.0" }
datafusion-functions-nested = { version = "51.0.0" }

[features]
backtrace = ["datafusion/backtrace"]
Expand Down
4 changes: 2 additions & 2 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1450,7 +1450,7 @@ impl PhysicalPlanner {
vec![], // No struct columns to unnest
output_schema,
unnest_options,
));
)?);

Ok((
scans,
Expand Down Expand Up @@ -2234,7 +2234,7 @@ impl PhysicalPlanner {
partition_by,
sort_phy_exprs,
window_frame.into(),
input_schema.as_ref(),
input_schema,
false, // TODO: Ignore nulls
false, // TODO: Spark does not support DISTINCT ... OVER
None,
Expand Down
4 changes: 2 additions & 2 deletions native/core/src/parquet/encryption_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl EncryptionFactory for CometEncryptionFactory {
_options: &EncryptionFactoryOptions,
_schema: &SchemaRef,
_file_path: &Path,
) -> Result<Option<FileEncryptionProperties>, DataFusionError> {
) -> Result<Option<Arc<FileEncryptionProperties>>, DataFusionError> {
Err(DataFusionError::NotImplemented(
"Comet does not support Parquet encryption yet."
.parse()
Expand All @@ -69,7 +69,7 @@ impl EncryptionFactory for CometEncryptionFactory {
&self,
options: &EncryptionFactoryOptions,
file_path: &Path,
) -> Result<Option<FileDecryptionProperties>, DataFusionError> {
) -> Result<Option<Arc<FileDecryptionProperties>>, DataFusionError> {
let config: CometEncryptionConfig = options.to_extension_options()?;

let full_path: String = config.uri_base + file_path.as_ref();
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/parquet/parquet_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub(crate) fn init_datasource_exec(
object_store_url,
file_source,
)
.with_projection(Some(projection_vector))
.with_projection_indices(Some(projection_vector))
Copy link
Contributor

@comphead comphead Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI It was an issue with Q13 projection pushdown with this method

apache/datafusion#19094 (review)

.with_table_partition_cols(partition_fields)
.build()
}
Expand Down
16 changes: 8 additions & 8 deletions native/core/src/parquet/read/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl ColumnReader {
match desc.physical_type() {
PhysicalType::BOOLEAN => typed_reader!(BoolColumnReader, Boolean),
PhysicalType::INT32 => {
if let Some(ref logical_type) = desc.logical_type() {
if let Some(ref logical_type) = desc.logical_type_ref() {
match logical_type {
lt @ LogicalType::Integer {
bit_width,
Expand Down Expand Up @@ -282,7 +282,7 @@ impl ColumnReader {
}
}
PhysicalType::INT64 => {
if let Some(ref logical_type) = desc.logical_type() {
if let Some(ref logical_type) = desc.logical_type_ref() {
match logical_type {
lt @ LogicalType::Integer {
bit_width,
Expand Down Expand Up @@ -331,19 +331,19 @@ impl ColumnReader {
None
};
match unit {
ParquetTimeUnit::MILLIS(_) => {
ParquetTimeUnit::MILLIS => {
typed_reader!(
Int64TimestampMillisColumnReader,
ArrowDataType::Timestamp(time_unit, time_zone)
)
}
ParquetTimeUnit::MICROS(_) => {
ParquetTimeUnit::MICROS => {
typed_reader!(
Int64TimestampMicrosColumnReader,
ArrowDataType::Timestamp(time_unit, time_zone)
)
}
ParquetTimeUnit::NANOS(_) => {
ParquetTimeUnit::NANOS => {
typed_reader!(
Int64TimestampNanosColumnReader,
ArrowDataType::Int64
Expand Down Expand Up @@ -390,7 +390,7 @@ impl ColumnReader {

PhysicalType::DOUBLE => typed_reader!(DoubleColumnReader, Float64),
PhysicalType::BYTE_ARRAY => {
if let Some(logical_type) = desc.logical_type() {
if let Some(logical_type) = desc.logical_type_ref() {
match logical_type {
LogicalType::String => typed_reader!(StringColumnReader, Utf8),
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
Expand All @@ -403,13 +403,13 @@ impl ColumnReader {
}
}
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
if let Some(logical_type) = desc.logical_type() {
if let Some(logical_type) = desc.logical_type_ref() {
match logical_type {
LogicalType::Decimal {
precision,
scale: _,
} => {
if !use_decimal_128 && precision <= DECIMAL_MAX_INT_DIGITS {
if !use_decimal_128 && precision <= &DECIMAL_MAX_INT_DIGITS {
typed_reader!(FLBADecimal32ColumnReader, Int32)
} else if !use_decimal_128
&& promotion_info.precision <= DECIMAL_MAX_LONG_DIGITS
Expand Down
7 changes: 3 additions & 4 deletions native/core/src/parquet/util/jni.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use datafusion::execution::object_store::ObjectStoreUrl;
use object_store::path::Path;
use parquet::{
basic::{Encoding, LogicalType, TimeUnit, Type as PhysicalType},
format::{MicroSeconds, MilliSeconds, NanoSeconds},
schema::types::{ColumnDescriptor, ColumnPath, PrimitiveTypeBuilder},
};
use url::{ParseError, Url};
Expand Down Expand Up @@ -185,9 +184,9 @@ fn convert_logical_type(

fn convert_time_unit(time_unit: jint) -> TimeUnit {
match time_unit {
0 => TimeUnit::MILLIS(MilliSeconds::new()),
1 => TimeUnit::MICROS(MicroSeconds::new()),
2 => TimeUnit::NANOS(NanoSeconds::new()),
0 => TimeUnit::MILLIS,
1 => TimeUnit::MICROS,
2 => TimeUnit::NANOS,
_ => panic!("Invalid time unit id for Parquet: {time_unit}"),
}
}
Expand Down
1 change: 1 addition & 0 deletions native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ arrow = {workspace = true}
criterion = { version = "0.7", features = ["async", "async_tokio", "async_std"] }
rand = { workspace = true}
tokio = { version = "1", features = ["rt-multi-thread"] }
datafusion = { workspace = true, features = ["sql"] }

[lib]
name = "datafusion_comet_spark_expr"
Expand Down
10 changes: 8 additions & 2 deletions native/spark-expr/src/agg_funcs/avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ use arrow::compute::sum;
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion::common::{not_impl_err, Result, ScalarValue};
use datafusion::logical_expr::{
type_coercion::aggregates::avg_return_type, Accumulator, AggregateUDFImpl, EmitTo,
GroupsAccumulator, ReversedUDAF, Signature,
Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature,
};
use datafusion::physical_expr::expressions::format_state_name;
use std::{any::Any, sync::Arc};
Expand All @@ -36,6 +35,13 @@ use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion::logical_expr::Volatility::Immutable;
use DataType::*;

fn avg_return_type(_name: &str, data_type: &DataType) -> Result<DataType> {
match data_type {
Float64 => Ok(Float64),
_ => not_impl_err!("Avg return type for {data_type}"),
}
}

/// AVG aggregate expression
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Avg {
Expand Down
19 changes: 17 additions & 2 deletions native/spark-expr/src/agg_funcs/avg_decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,28 @@ use std::{any::Any, sync::Arc};

use crate::utils::{build_bool_state, is_valid_decimal_precision, unlikely};
use arrow::array::ArrowNativeTypeOp;
use arrow::datatypes::{MAX_DECIMAL128_FOR_EACH_PRECISION, MIN_DECIMAL128_FOR_EACH_PRECISION};
use arrow::datatypes::{
DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, MAX_DECIMAL128_FOR_EACH_PRECISION,
MIN_DECIMAL128_FOR_EACH_PRECISION,
};
use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion::logical_expr::type_coercion::aggregates::avg_return_type;
use datafusion::logical_expr::Volatility::Immutable;
use num::{integer::div_ceil, Integer};
use DataType::*;

fn avg_return_type(_name: &str, data_type: &DataType) -> Result<DataType> {
match data_type {
Decimal128(precision, scale) => {
// In the spark, the result type is DECIMAL(min(38,precision+4), min(38,scale+4)).
// Ref: https://github.com/apache/spark/blob/fcf636d9eb8d645c24be3db2d599aba2d7e2955a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala#L66
let new_precision = DECIMAL128_MAX_PRECISION.min(*precision + 4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

let new_scale = DECIMAL128_MAX_SCALE.min(*scale + 4);
Ok(Decimal128(new_precision, new_scale))
}
_ => not_impl_err!("Avg return type for {data_type}"),
}
}

/// AVG aggregate expression
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AvgDecimal {
Expand Down
2 changes: 1 addition & 1 deletion native/spark-expr/src/math_funcs/internal/checkoverflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl PhysicalExpr for CheckOverflow {
);

let new_v: Option<i128> = v.and_then(|v| {
Decimal128Type::validate_decimal_precision(v, precision)
Decimal128Type::validate_decimal_precision(v, precision, scale)
.map(|_| v)
.ok()
});
Expand Down
10 changes: 6 additions & 4 deletions native/spark-expr/src/math_funcs/internal/make_decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn spark_make_decimal(
match &args[0] {
ColumnarValue::Scalar(v) => match v {
ScalarValue::Int64(n) => Ok(ColumnarValue::Scalar(ScalarValue::Decimal128(
long_to_decimal(n, precision),
long_to_decimal(n, precision, scale),
precision,
scale,
))),
Expand All @@ -45,7 +45,7 @@ pub fn spark_make_decimal(
let arr = a.as_primitive::<Int64Type>();
let mut result = Decimal128Builder::new();
for v in arr.into_iter() {
result.append_option(long_to_decimal(&v, precision))
result.append_option(long_to_decimal(&v, precision, scale))
}
let result_type = DataType::Decimal128(precision, scale);

Expand All @@ -61,9 +61,11 @@ pub fn spark_make_decimal(
/// Convert the input long to decimal with the given maximum precision. If overflows, returns null
/// instead.
#[inline]
fn long_to_decimal(v: &Option<i64>, precision: u8) -> Option<i128> {
fn long_to_decimal(v: &Option<i64>, precision: u8, scale: i8) -> Option<i128> {
match v {
Some(v) if validate_decimal_precision(*v as i128, precision).is_ok() => Some(*v as i128),
Some(v) if validate_decimal_precision(*v as i128, precision, scale).is_ok() => {
Some(*v as i128)
}
_ => None,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ case class CometIcebergNativeScanMetadata(
nameMapping: Option[String],
tasks: java.util.List[_],
scanSchema: Any,
tableSchema: Any,
globalFieldIdMapping: Map[String, Int],
catalogProperties: Map[String, String],
fileFormat: String)
Expand Down Expand Up @@ -763,6 +764,7 @@ object CometIcebergNativeScanMetadata extends Logging {
table <- getTable(scan)
tasks <- getTasks(scan)
scanSchema <- getExpectedSchema(scan)
tableSchema <- getSchema(table)
} yield {
// nameMapping is optional - if it fails we just use None
val nameMapping = getTableProperties(table).flatMap { properties =>
Expand All @@ -787,6 +789,7 @@ object CometIcebergNativeScanMetadata extends Logging {
nameMapping = nameMapping,
tasks = tasks,
scanSchema = scanSchema,
tableSchema = tableSchema,
globalFieldIdMapping = globalFieldIdMapping,
catalogProperties = catalogProperties,
fileFormat = fileFormat)
Expand Down
Loading
Loading