Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion vortex-array/src/arrow/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Datum {
})
}

pub fn with_target_datatype(
pub fn try_new_with_target_datatype(
array: &dyn Array,
target_datatype: &DataType,
) -> VortexResult<Self> {
Expand All @@ -57,6 +57,10 @@ impl Datum {
})
}
}

pub fn data_type(&self) -> &DataType {
self.array.data_type()
}
}

impl ArrowDatum for Datum {
Expand Down
3 changes: 2 additions & 1 deletion vortex-array/src/compute/like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,9 @@ pub(crate) fn arrow_like(
"Arrow Like: length mismatch for {}",
array.encoding_id()
);
// convert the pattern to the preferred array datatype
let lhs = Datum::try_new(array)?;
let rhs = Datum::try_new(pattern)?;
let rhs = Datum::try_new_with_target_datatype(pattern, lhs.data_type())?;

let result = match (options.negated, options.case_insensitive) {
(false, false) => arrow_string::like::like(&lhs, &rhs)?,
Expand Down
1 change: 1 addition & 0 deletions vortex-btrblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ rust-version = { workspace = true }
version = { workspace = true }

[dependencies]
arrow-schema = { workspace = true }
getrandom_v03 = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
Expand Down
65 changes: 65 additions & 0 deletions vortex-btrblocks/src/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
use vortex_array::arrays::{
ConstantArray, DictArray, MaskedArray, VarBinArray, VarBinViewArray, VarBinViewVTable,
};
use vortex_array::arrow::{FromArrowArray, IntoArrowArray};
use vortex_array::builders::dict::dict_encode;
use vortex_array::vtable::ValidityHelper;
use vortex_array::{ArrayRef, IntoArray, ToCanonical};
use vortex_dtype::DType;
use vortex_error::{VortexExpect, VortexResult};
use vortex_fsst::{FSSTArray, fsst_compress, fsst_train_compressor};
use vortex_scalar::Scalar;
Expand Down Expand Up @@ -93,6 +95,7 @@ impl Compressor for StringCompressor {
fn schemes() -> &'static [&'static Self::SchemeType] {
&[
&UncompressedScheme,
&VarBinScheme,
&DictScheme,
&FSSTScheme,
&ConstantScheme,
Expand All @@ -116,6 +119,9 @@ impl<T> StringScheme for T where T: Scheme<StatsType = StringStats, CodeType = S
#[derive(Debug, Copy, Clone)]
pub struct UncompressedScheme;

#[derive(Debug, Copy, Clone)]
pub struct VarBinScheme;

#[derive(Debug, Copy, Clone)]
pub struct DictScheme;

Expand All @@ -137,6 +143,7 @@ const FSST_SCHEME: StringCode = StringCode(2);
const CONSTANT_SCHEME: StringCode = StringCode(3);

const SPARSE_SCHEME: StringCode = StringCode(4);
const VARBIN_SCHEME: StringCode = StringCode(5);

impl Scheme for UncompressedScheme {
type StatsType = StringStats;
Expand Down Expand Up @@ -167,6 +174,64 @@ impl Scheme for UncompressedScheme {
}
}

impl Scheme for VarBinScheme {
type StatsType = StringStats;
type CodeType = StringCode;

fn code(&self) -> StringCode {
VARBIN_SCHEME
}

fn expected_compression_ratio(
&self,
stats: &Self::StatsType,
_is_sample: bool,
_allowed_cascading: usize,
_excludes: &[StringCode],
) -> VortexResult<f64> {
if stats.src.is_empty() {
return Ok(1.0);
}

let src = stats.source();

// Calculate VarBinView size using nbytes()
let varbinview_size = src.as_ref().nbytes();

let string_bytes = src.buffers().iter().map(|b| b.len() as u64).sum::<u64>();

// Determine offset type size based on total string bytes
// Arrow/Vortex uses i32 offsets if total size < u32::MAX, otherwise i64
let offset_type_size = if string_bytes < u32::MAX as u64 { 4 } else { 8 };
let offset_bytes = (src.len() as u64 + 1) * offset_type_size;

let varbin_size = string_bytes + offset_bytes;
assert!(varbin_size > 0, "cannot be empty");

Ok(varbinview_size as f64 / varbin_size as f64)
}

fn compress(
&self,
stats: &Self::StatsType,
_is_sample: bool,
_allowed_cascading: usize,
_excludes: &[StringCode],
) -> VortexResult<ArrayRef> {
let arrow_dtype = match stats.src.dtype() {
DType::Utf8(..) => arrow_schema::DataType::Utf8,
DType::Binary(..) => arrow_schema::DataType::Binary,
_ => unreachable!("VarBinView must be Utf8 or Binary"),
};

// Convert VarBinView -> Arrow VarBin -> Vortex VarBin
let arrow_array = stats.source().to_array().into_arrow(&arrow_dtype)?;
let nullable = stats.source().dtype().is_nullable();

Ok(ArrayRef::from_arrow(arrow_array.as_ref(), nullable))
}
}

impl Scheme for DictScheme {
type StatsType = StringStats;
type CodeType = StringCode;
Expand Down
4 changes: 2 additions & 2 deletions vortex-python/python/vortex/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def scan(
57,
null
]
-- child 1 type: string_view
-- child 1 type: string
[
"Joseph",
null,
Expand Down Expand Up @@ -143,7 +143,7 @@ def scan(
[
57
]
-- child 1 type: string_view
-- child 1 type: string
[
"Mikhail"
]
Expand Down
2 changes: 1 addition & 1 deletion vortex-python/python/vortex/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def execute(
31,
null
]
-- child 1 type: string_view
-- child 1 type: string
[
null,
"Angela"
Expand Down
17 changes: 13 additions & 4 deletions vortex-python/test/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ def test_fragment_to_table(ds: vx.dataset.VortexDataset):

for f in fragments:
assert f.to_table(columns=["bool", "string"]).schema == pa.schema(
[("bool", pa.bool_()), ("string", pa.string_view())]
[("bool", pa.bool_()), ("string", pa.string())]
)
assert f.to_table(columns=["string", "bool"]).schema == pa.schema(
[("string", pa.string_view()), ("bool", pa.bool_())]
[("string", pa.string()), ("bool", pa.bool_())]
)


Expand All @@ -223,5 +223,14 @@ def test_get_fragments(ds: vx.dataset.VortexDataset):
ds_filtered = ds.filter(filter_expr)
assert ds_filtered.count_rows() == sum(f.count_rows() for f in ds_filtered.get_fragments())

assert ds.to_table() == pa.concat_tables(f.to_table() for f in ds.get_fragments())
assert ds_filtered.to_table() == pa.concat_tables(f.to_table() for f in ds_filtered.get_fragments())
# Cast fragment tables to match dataset schema before comparison
# (fragments may use different string encoding)
ds_table = ds.to_table()
fragments_table = pa.concat_tables([f.to_table().cast(ds_table.schema) for f in ds.get_fragments()])
assert ds_table == fragments_table

ds_filtered_table = ds_filtered.to_table()
filtered_fragments_table = pa.concat_tables(
[f.to_table().cast(ds_filtered_table.schema) for f in ds_filtered.get_fragments()]
)
assert ds_filtered_table == filtered_fragments_table
Loading