Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,8 @@ pub enum SortOrder {
UNSIGNED,
/// Comparison is undefined.
UNDEFINED,
/// Use IEEE 754 total order.
TOTAL_ORDER,
}

impl SortOrder {
Expand All @@ -554,17 +556,43 @@ pub enum ColumnOrder {
/// Column uses the order defined by its logical or physical type
/// (if there is no logical type), parquet-format 2.4.0+.
TYPE_DEFINED_ORDER(SortOrder),
/// Column ordering to use for floating point types.
IEEE_754_TOTAL_ORDER,
/// Undefined column order, means legacy behaviour before parquet-format 2.4.0.
/// Sort order is always SIGNED.
UNDEFINED,
}

impl ColumnOrder {
/// Returns sort order for a physical/logical type.
/// Returns the sort order for a physical/logical type.
///
/// If `ieee754_total_order` is `true` then IEEE 754 total order will be used for floating point
/// types.
pub fn get_sort_order(
logical_type: Option<LogicalType>,
converted_type: ConvertedType,
physical_type: Type,
ieee754_total_order: bool,
) -> SortOrder {
// check for floating point types, then fall back to type defined order
match logical_type {
Some(LogicalType::Float16) if ieee754_total_order => SortOrder::TOTAL_ORDER,
_ => match physical_type {
Type::FLOAT | Type::DOUBLE if ieee754_total_order => SortOrder::TOTAL_ORDER,
_ => ColumnOrder::get_type_defined_sort_order(
logical_type,
converted_type,
physical_type,
),
},
}
}

/// Returns the type defined sort order for a physical/logical type.
pub fn get_type_defined_sort_order(
logical_type: Option<LogicalType>,
converted_type: ConvertedType,
physical_type: Type,
) -> SortOrder {
// TODO: Should this take converted and logical type, for compatibility?
match logical_type {
Expand Down Expand Up @@ -655,6 +683,7 @@ impl ColumnOrder {
pub fn sort_order(&self) -> SortOrder {
match *self {
ColumnOrder::TYPE_DEFINED_ORDER(order) => order,
ColumnOrder::IEEE_754_TOTAL_ORDER => SortOrder::TOTAL_ORDER,
ColumnOrder::UNDEFINED => SortOrder::SIGNED,
}
}
Expand Down Expand Up @@ -2151,7 +2180,11 @@ mod tests {
fn check_sort_order(types: Vec<LogicalType>, expected_order: SortOrder) {
for tpe in types {
assert_eq!(
ColumnOrder::get_sort_order(Some(tpe), ConvertedType::NONE, Type::BYTE_ARRAY),
ColumnOrder::get_type_defined_sort_order(
Some(tpe),
ConvertedType::NONE,
Type::BYTE_ARRAY
),
expected_order
);
}
Expand Down Expand Up @@ -2246,7 +2279,7 @@ mod tests {
fn check_sort_order(types: Vec<ConvertedType>, expected_order: SortOrder) {
for tpe in types {
assert_eq!(
ColumnOrder::get_sort_order(None, tpe, Type::BYTE_ARRAY),
ColumnOrder::get_type_defined_sort_order(None, tpe, Type::BYTE_ARRAY),
expected_order
);
}
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/bin/parquet-rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ struct Args {
/// Sets whether to coerce Arrow types to match Parquet specification
#[clap(long)]
coerce_types: Option<bool>,

/// Sets whether to use IEEE 754 total order for floating point columns
#[clap(long)]
total_order: Option<bool>,
}

fn main() {
Expand Down Expand Up @@ -382,6 +386,9 @@ fn main() {
if let Some(value) = args.coerce_types {
writer_properties_builder = writer_properties_builder.set_coerce_types(value);
}
if let Some(value) = args.total_order {
writer_properties_builder = writer_properties_builder.set_ieee754_total_order(value);
}
let writer_properties = writer_properties_builder.build();
let mut parquet_writer = ArrowWriter::try_new(
File::create(&args.output).expect("Unable to open output file"),
Expand Down
25 changes: 17 additions & 8 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use bytes::Bytes;
use half::f16;

use crate::basic::{ConvertedType, Encoding, LogicalType, Type};
use crate::basic::{ConvertedType, Encoding, LogicalType, SortOrder, Type};
use crate::bloom_filter::Sbbf;
use crate::column::writer::{
compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min,
Expand All @@ -28,7 +28,9 @@ use crate::data_type::DataType;
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
use crate::errors::{ParquetError, Result};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
use crate::schema::types::ColumnDescPtr;

use super::OrderedColumnDescriptor;

/// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`]
pub trait ColumnValues {
Expand Down Expand Up @@ -126,7 +128,7 @@ pub trait ColumnValueEncoder {
pub struct ColumnValueEncoderImpl<T: DataType> {
encoder: Box<dyn Encoder<T>>,
dict_encoder: Option<DictEncoder<T>>,
descr: ColumnDescPtr,
descr: OrderedColumnDescriptor,
num_values: usize,
statistics_enabled: EnabledStatistics,
min_value: Option<T::T>,
Expand Down Expand Up @@ -201,10 +203,12 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
.transpose()?;

let descr = OrderedColumnDescriptor::new(descr.clone(), props.ieee754_total_order());

Ok(Self {
encoder,
dict_encoder,
descr: descr.clone(),
descr,
num_values: 0,
statistics_enabled,
bloom_filter,
Expand Down Expand Up @@ -309,22 +313,23 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
}
}

fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)>
fn get_min_max<'a, T, I>(descr: &OrderedColumnDescriptor, mut iter: I) -> Option<(T, T)>
where
T: ParquetValueType + 'a,
I: Iterator<Item = &'a T>,
{
let ieee754_total_order = descr.sort_order == SortOrder::TOTAL_ORDER;
let first = loop {
let next = iter.next()?;
if !is_nan(descr, next) {
if ieee754_total_order || !is_nan(&descr.descr, next) {
break next;
}
};

let mut min = first;
let mut max = first;
for val in iter {
if is_nan(descr, val) {
if !ieee754_total_order && is_nan(&descr.descr, val) {
continue;
}
if compare_greater(descr, min, val) {
Expand All @@ -350,7 +355,11 @@ where
}

#[inline]
fn replace_zero<T: ParquetValueType>(val: &T, descr: &ColumnDescriptor, replace: f32) -> T {
fn replace_zero<T: ParquetValueType>(val: &T, descr: &OrderedColumnDescriptor, replace: f32) -> T {
if descr.sort_order == SortOrder::TOTAL_ORDER {
return val.clone();
}

match T::PHYSICAL_TYPE {
Type::FLOAT if f32::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
T::try_from_le_slice(&f32::to_le_bytes(replace)).unwrap()
Expand Down
Loading
Loading