Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions vortex-array/src/arrays/decimal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

mod compute;
mod narrow;
mod ops;
mod patch;
mod serde;
Expand All @@ -12,6 +13,7 @@ use vortex_dtype::{DType, DecimalDType};
use vortex_error::{VortexExpect, VortexResult, vortex_ensure, vortex_panic};
use vortex_scalar::{DecimalValueType, NativeDecimalType};

pub use crate::arrays::decimal::narrow::narrowed_decimal;
use crate::builders::ArrayBuilder;
use crate::stats::{ArrayStats, StatsSetRef};
use crate::validity::Validity;
Expand Down
61 changes: 61 additions & 0 deletions vortex-array/src/arrays/decimal/narrow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use itertools::{Itertools, MinMaxResult};
use vortex_error::VortexExpect;
use vortex_scalar::{BigCast, DecimalValueType, i256};

use crate::arrays::DecimalArray;
use crate::vtable::ValidityHelper;

macro_rules! try_downcast {
($array:expr, from: $src:ty, to: $($dst:ty),*) => {{
// Collect the min/max of the values
let minmax = $array.buffer::<$src>().iter().copied().minmax();
match minmax {
MinMaxResult::NoElements => return $array,
MinMaxResult::OneElement(_) => return $array,
MinMaxResult::MinMax(min, max) => {
$(
if <$dst as BigCast>::from(min).is_some() && <$dst as BigCast>::from(max).is_some() {
return DecimalArray::new::<$dst>(
$array
.buffer::<$src>()
.into_iter()
.map(|v| <$dst as BigCast>::from(v).vortex_expect("decimal conversion failure"))
.collect(),
$array.decimal_dtype(),
$array.validity().clone(),
);
}
)*

return $array;
}
}
}};
}

/// Attempt to narrow the decimal array to any smaller supported type.
pub fn narrowed_decimal(decimal_array: DecimalArray) -> DecimalArray {
match decimal_array.values_type() {
// Cannot narrow any more
DecimalValueType::I8 => decimal_array,
DecimalValueType::I16 => {
try_downcast!(decimal_array, from: i16, to: i8)
}
DecimalValueType::I32 => {
try_downcast!(decimal_array, from: i32, to: i8, i16)
}
DecimalValueType::I64 => {
try_downcast!(decimal_array, from: i64, to: i8, i16, i32)
}
DecimalValueType::I128 => {
try_downcast!(decimal_array, from: i128, to: i8, i16, i32, i64)
}
DecimalValueType::I256 => {
try_downcast!(decimal_array, from: i256, to: i8, i16, i32, i64, i128)
}
_ => decimal_array,
}
}
59 changes: 3 additions & 56 deletions vortex-btrblocks/src/decimal.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use itertools::{Itertools, MinMaxResult};
use vortex_array::ArrayRef;
use vortex_array::arrays::{DecimalArray, PrimitiveArray};
use vortex_array::arrays::{DecimalArray, PrimitiveArray, narrowed_decimal};
use vortex_array::vtable::ValidityHelper;
use vortex_decimal_byte_parts::DecimalBytePartsArray;
use vortex_error::{VortexExpect, VortexResult};
use vortex_scalar::{BigCast, DecimalValueType, i256};
use vortex_error::VortexResult;
use vortex_scalar::DecimalValueType;

use crate::{Compressor, IntCompressor, MAX_CASCADE};

Expand All @@ -28,55 +27,3 @@ pub fn compress_decimal(decimal: &DecimalArray) -> VortexResult<ArrayRef> {

DecimalBytePartsArray::try_new(compressed, decimal.decimal_dtype()).map(|d| d.to_array())
}

macro_rules! try_downcast {
($array:expr, from: $src:ty, to: $($dst:ty),*) => {{
// Collect the min/max of the values
let minmax = $array.buffer::<$src>().iter().copied().minmax();
match minmax {
MinMaxResult::NoElements => return $array,
MinMaxResult::OneElement(_) => return $array,
MinMaxResult::MinMax(min, max) => {
$(
if <$dst as BigCast>::from(min).is_some() && <$dst as BigCast>::from(max).is_some() {
return DecimalArray::new::<$dst>(
$array
.buffer::<$src>()
.into_iter()
.map(|v| <$dst as BigCast>::from(v).vortex_expect("decimal conversion failure"))
.collect(),
$array.decimal_dtype(),
$array.validity().clone(),
);
}
)*

return $array;
}
}
}};
}

/// Attempt to narrow the decimal array to any smaller supported type.
fn narrowed_decimal(decimal_array: DecimalArray) -> DecimalArray {
match decimal_array.values_type() {
// Cannot narrow any more
DecimalValueType::I8 => decimal_array,
DecimalValueType::I16 => {
try_downcast!(decimal_array, from: i16, to: i8)
}
DecimalValueType::I32 => {
try_downcast!(decimal_array, from: i32, to: i8, i16)
}
DecimalValueType::I64 => {
try_downcast!(decimal_array, from: i64, to: i8, i16, i32)
}
DecimalValueType::I128 => {
try_downcast!(decimal_array, from: i128, to: i8, i16, i32, i64)
}
DecimalValueType::I256 => {
try_downcast!(decimal_array, from: i256, to: i8, i16, i32, i64, i128)
}
_ => decimal_array,
}
}
1 change: 1 addition & 0 deletions vortex-layout/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tracing = { workspace = true }
vortex-array = { workspace = true }
vortex-btrblocks = { workspace = true }
vortex-buffer = { workspace = true }
vortex-decimal-byte-parts = { workspace = true }
vortex-dict = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
Expand Down
80 changes: 60 additions & 20 deletions vortex-layout/src/layouts/compact.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex_array::arrays::{ExtensionArray, ListArray, StructArray};
use vortex_array::arrays::{
ExtensionArray, FixedSizeListArray, ListArray, PrimitiveArray, StructArray, narrowed_decimal,
};
use vortex_array::vtable::ValidityHelper;
use vortex_array::{Array, ArrayRef, Canonical, IntoArray};
use vortex_decimal_byte_parts::DecimalBytePartsArray;
use vortex_dtype::PType;
use vortex_error::VortexResult;
use vortex_pco::PcoArray;
use vortex_scalar::DecimalValueType;
use vortex_zstd::ZstdArray;

fn is_pco_number_type(ptype: PType) -> bool {
Expand Down Expand Up @@ -62,32 +66,54 @@ impl CompactCompressor {

/// Compress a single array using the compact strategy
pub fn compress_canonical(&self, canonical: Canonical) -> VortexResult<ArrayRef> {
match canonical {
let uncompressed_nbytes = canonical.as_ref().nbytes();
let compressed = match &canonical {
// TODO compress BoolArrays
// TODO compress DecimalArrays
Canonical::Primitive(primitive) => {
// pco for applicable numbers, zstd for everything else
let ptype = primitive.ptype();

if is_pco_number_type(ptype) {
let pco_array =
PcoArray::from_primitive(&primitive, self.pco_level, self.values_per_page)?;
Ok(pco_array.into_array())
PcoArray::from_primitive(primitive, self.pco_level, self.values_per_page)?;
pco_array.into_array()
} else {
let zstd_array = ZstdArray::from_primitive(
&primitive,
primitive,
self.zstd_level,
self.values_per_page,
)?;
Ok(zstd_array.into_array())
zstd_array.into_array()
}
}
Canonical::Decimal(decimal) => {
let decimal = narrowed_decimal(decimal.clone());
let validity = decimal.validity();
let int_values = match decimal.values_type() {
DecimalValueType::I8 => {
PrimitiveArray::new(decimal.buffer::<i8>(), validity.clone())
}
DecimalValueType::I16 => {
PrimitiveArray::new(decimal.buffer::<i16>(), validity.clone())
}
DecimalValueType::I32 => {
PrimitiveArray::new(decimal.buffer::<i32>(), validity.clone())
}
DecimalValueType::I64 => {
PrimitiveArray::new(decimal.buffer::<i64>(), validity.clone())
}
_ => {
// Vortex lacks support for i128 and i256.
return Ok(canonical.into_array());
}
};
let compressed = self.compress_canonical(Canonical::Primitive(int_values))?;
DecimalBytePartsArray::try_new(compressed, decimal.decimal_dtype())?.to_array()
}
Canonical::VarBinView(vbv) => {
// always zstd
Ok(
ZstdArray::from_var_bin_view(&vbv, self.zstd_level, self.values_per_page)?
.into_array(),
)
ZstdArray::from_var_bin_view(vbv, self.zstd_level, self.values_per_page)?
.into_array()
}
Canonical::Struct(struct_array) => {
// recurse
Expand All @@ -97,37 +123,51 @@ impl CompactCompressor {
.map(|field| self.compress(field))
.collect::<VortexResult<Vec<_>>>()?;

Ok(StructArray::try_new(
StructArray::try_new(
struct_array.names().clone(),
fields,
struct_array.len(),
struct_array.validity().clone(),
)?
.into_array())
.into_array()
}
Canonical::List(list_array) => {
// recurse
let compressed_elems = self.compress(list_array.elements())?;
let compressed_offsets = self.compress(list_array.offsets())?;

Ok(ListArray::try_new(
ListArray::try_new(
compressed_elems,
compressed_offsets,
list_array.validity().clone(),
)?
.into_array())
.into_array()
}
Canonical::FixedSizeList(list_array) => {
// recurse
let compressed_elems = self.compress(list_array.elements())?;

FixedSizeListArray::try_new(
compressed_elems,
list_array.list_size(),
list_array.validity().clone(),
list_array.len(),
)?
.into_array()
}
Canonical::Extension(ext_array) => {
// recurse
let compressed_storage = self.compress(ext_array.storage())?;

Ok(
ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage)
.into_array(),
)
ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage).into_array()
}
other => Ok(other.into_array()),
_ => return Ok(canonical.into_array()),
};

if compressed.nbytes() >= uncompressed_nbytes {
return Ok(canonical.into_array());
}
Ok(compressed)
}
}

Expand Down
Loading