Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ harness = false
name = "padding"
harness = false

[[bench]]
name = "normalize_nan"
harness = false

[[test]]
name = "test_udf_registration"
path = "tests/spark_expr_reg.rs"
88 changes: 88 additions & 0 deletions native/spark-expr/benches/normalize_nan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Benchmarks for NormalizeNaNAndZero expression

use arrow::array::Float64Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use datafusion::physical_expr::expressions::Column;
use datafusion::physical_expr::PhysicalExpr;
use datafusion_comet_spark_expr::NormalizeNaNAndZero;
use std::hint::black_box;
use std::sync::Arc;

const BATCH_SIZE: usize = 8192;

fn make_col(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
Arc::new(Column::new(name, index))
}

/// Create a batch with float64 column containing various values including NaN and -0.0
fn create_float_batch(nan_pct: usize, neg_zero_pct: usize, null_pct: usize) -> RecordBatch {
let mut values: Vec<Option<f64>> = Vec::with_capacity(BATCH_SIZE);

for i in 0..BATCH_SIZE {
if null_pct > 0 && i % (100 / null_pct.max(1)) == 0 {
values.push(None);
} else if nan_pct > 0 && i % (100 / nan_pct.max(1)) == 1 {
values.push(Some(f64::NAN));
} else if neg_zero_pct > 0 && i % (100 / neg_zero_pct.max(1)) == 2 {
values.push(Some(-0.0));
} else {
values.push(Some(i as f64 * 1.5));
}
}

let array = Float64Array::from(values);
let schema = Schema::new(vec![Field::new("c1", DataType::Float64, true)]);

RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
}

fn bench_normalize_nan_and_zero(c: &mut Criterion) {
let mut group = c.benchmark_group("normalize_nan_and_zero");

// Test with different percentages of special values
let test_cases = [
("no_special", 0, 0, 0),
("10pct_nan", 10, 0, 0),
("10pct_neg_zero", 0, 10, 0),
("10pct_null", 0, 0, 10),
("mixed_10pct", 5, 5, 5),
("all_normal", 0, 0, 0),
];

for (name, nan_pct, neg_zero_pct, null_pct) in test_cases {
let batch = create_float_batch(nan_pct, neg_zero_pct, null_pct);

let normalize_expr = Arc::new(NormalizeNaNAndZero::new(
DataType::Float64,
make_col("c1", 0),
));

group.bench_with_input(BenchmarkId::new("float64", name), &batch, |b, batch| {
b.iter(|| black_box(normalize_expr.evaluate(black_box(batch)).unwrap()));
});
}

group.finish();
}

criterion_group!(benches, bench_normalize_nan_and_zero);
criterion_main!(benches);
80 changes: 22 additions & 58 deletions native/spark-expr/src/math_funcs/internal/normalize_nan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use arrow::compute::unary;
use arrow::datatypes::{DataType, Schema};
use arrow::{
array::{as_primitive_array, ArrayAccessor, ArrayIter, Float32Array, Float64Array},
datatypes::{ArrowNativeType, Float32Type, Float64Type},
array::{as_primitive_array, Float32Array, Float64Array},
datatypes::{Float32Type, Float64Type},
record_batch::RecordBatch,
};
use datafusion::logical_expr::ColumnarValue;
Expand Down Expand Up @@ -78,14 +79,16 @@ impl PhysicalExpr for NormalizeNaNAndZero {

match &self.data_type {
DataType::Float32 => {
let v = eval_typed(as_primitive_array::<Float32Type>(&array));
let new_array = Float32Array::from(v);
Ok(ColumnarValue::Array(Arc::new(new_array)))
let input = as_primitive_array::<Float32Type>(&array);
// Use unary which operates directly on values buffer without intermediate allocation
let result: Float32Array = unary(input, normalize_float);
Copy link
Contributor

@comphead comphead Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 unary is fast but it requires the operation to be infallible and this is the case here.

Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Float64 => {
let v = eval_typed(as_primitive_array::<Float64Type>(&array));
let new_array = Float64Array::from(v);
Ok(ColumnarValue::Array(Arc::new(new_array)))
let input = as_primitive_array::<Float64Type>(&array);
// Use unary which operates directly on values buffer without intermediate allocation
let result: Float64Array = unary(input, normalize_float);
Ok(ColumnarValue::Array(Arc::new(result)))
}
dt => panic!("Unexpected data type {dt:?}"),
}
Expand All @@ -106,60 +109,21 @@ impl PhysicalExpr for NormalizeNaNAndZero {
}
}

fn eval_typed<V: FloatDouble, T: ArrayAccessor<Item = V>>(input: T) -> Vec<Option<V>> {
let iter = ArrayIter::new(input);
iter.map(|o| {
o.map(|v| {
if v.is_nan() {
v.nan()
} else if v.is_neg_zero() {
v.zero()
} else {
v
}
})
})
.collect()
/// Normalize a floating point value by converting all NaN representations to a canonical NaN
/// and negative zero to positive zero. This is used for Spark's comparison semantics.
#[inline]
fn normalize_float<T: num::Float>(v: T) -> T {
if v.is_nan() {
T::nan()
} else if v == T::neg_zero() {
T::zero()
} else {
v
}
}

impl Display for NormalizeNaNAndZero {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "FloatNormalize [child: {}]", self.child)
}
}

trait FloatDouble: ArrowNativeType {
fn is_nan(&self) -> bool;
fn nan(&self) -> Self;
fn is_neg_zero(&self) -> bool;
fn zero(&self) -> Self;
}

impl FloatDouble for f32 {
fn is_nan(&self) -> bool {
f32::is_nan(*self)
}
fn nan(&self) -> Self {
f32::NAN
}
fn is_neg_zero(&self) -> bool {
*self == -0.0
}
fn zero(&self) -> Self {
0.0
}
}
impl FloatDouble for f64 {
fn is_nan(&self) -> bool {
f64::is_nan(*self)
}
fn nan(&self) -> Self {
f64::NAN
}
fn is_neg_zero(&self) -> bool {
*self == -0.0
}
fn zero(&self) -> Self {
0.0
}
}
Loading