Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 67 additions & 28 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1279,35 +1279,34 @@ impl MiniBlockScheduler {

let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
let num_dictionary_items = layout.num_dictionary_items;
match dictionary_encoding.compression.as_ref().unwrap() {
Compression::Variable(_) => Some(MiniBlockSchedulerDictionary {
dictionary_decompressor: decompressors
.create_block_decompressor(dictionary_encoding)?
.into(),
dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
dictionary_data_alignment: 4,
num_dictionary_items,
}),
Compression::Flat(_) => Some(MiniBlockSchedulerDictionary {
dictionary_decompressor: decompressors
.create_block_decompressor(dictionary_encoding)?
.into(),
dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
dictionary_data_alignment: 16,
num_dictionary_items,
}),
Compression::General(_) => Some(MiniBlockSchedulerDictionary {
dictionary_decompressor: decompressors
.create_block_decompressor(dictionary_encoding)?
let dictionary_decompressor = decompressors
.create_block_decompressor(dictionary_encoding)?
.into();
let dictionary_data_alignment = match dictionary_encoding.compression.as_ref().unwrap()
{
Compression::Variable(_) => 4,
Compression::Flat(_) => 16,
Compression::General(_) => 1,
Compression::InlineBitpacking(_) | Compression::OutOfLineBitpacking(_) => {
crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
}
_ => {
return Err(Error::InvalidInput {
source: format!(
"Unsupported mini-block dictionary encoding: {:?}",
dictionary_encoding.compression.as_ref().unwrap()
)
.into(),
dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
dictionary_data_alignment: 1,
num_dictionary_items,
}),
_ => unreachable!(
"Mini-block dictionary encoding must use Variable, Flat, or General compression"
),
}
location: location!(),
})
}
};
Some(MiniBlockSchedulerDictionary {
dictionary_decompressor,
dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
dictionary_data_alignment,
num_dictionary_items,
})
} else {
None
};
Expand Down Expand Up @@ -4903,6 +4902,7 @@ mod tests {
FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor, PreambleAction,
StructuralPageScheduler,
};
use crate::compression::DefaultDecompressionStrategy;
use crate::constants::{STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK};
use crate::data::BlockInfo;
use crate::decoder::PageEncoding;
Expand All @@ -4911,6 +4911,7 @@ mod tests {
};
use crate::format::pb21;
use crate::format::pb21::compressive_encoding::Compression;
use crate::format::ProtobufUtils21;
use crate::testing::{check_round_trip_encoding_of_data, TestCases};
use crate::version::LanceFileVersion;
use arrow_array::{ArrayRef, Int8Array, StringArray, UInt64Array};
Expand Down Expand Up @@ -6070,6 +6071,44 @@ mod tests {
check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
}

#[test]
fn test_miniblock_dictionary_out_of_line_bitpacking_decode() {
let rows = 10_000;
let unique_values = 2_000;

let dictionary_encoding =
ProtobufUtils21::out_of_line_bitpacking(64, ProtobufUtils21::flat(11, None));
let layout = pb21::MiniBlockLayout {
rep_compression: None,
def_compression: None,
value_compression: Some(ProtobufUtils21::flat(64, None)),
dictionary: Some(dictionary_encoding),
num_dictionary_items: unique_values,
layers: vec![pb21::RepDefLayer::RepdefAllValidItem as i32],
num_buffers: 1,
repetition_index_depth: 0,
num_items: rows,
has_large_chunk: false,
};

let buffer_offsets_and_sizes = vec![(0, 0), (0, 0), (0, 0)];
let scheduler = super::MiniBlockScheduler::try_new(
&buffer_offsets_and_sizes,
/*priority=*/ 0,
/*items_in_page=*/ rows,
&layout,
&DefaultDecompressionStrategy::default(),
)
.unwrap();

let dictionary = scheduler.dictionary.unwrap();
assert_eq!(dictionary.num_dictionary_items, unique_values);
assert_eq!(
dictionary.dictionary_data_alignment,
crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
);
}

// Dictionary encoding decision tests
/// Helper to create FixedWidth test data block with exact cardinality stat injected
/// to ensure consistent test behavior (avoids HLL estimation error)
Expand Down
97 changes: 97 additions & 0 deletions rust/lance/src/dataset/tests/dataset_scanner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashMap;
use std::sync::Arc;
use std::vec;

Expand All @@ -9,19 +10,28 @@ use lance_arrow::json::{is_arrow_json_field, json_field, JsonArray};
use lance_arrow::FixedSizeListArrayExt;

use arrow::compute::concat_batches;
use arrow_array::UInt64Array;
use arrow_array::{Array, FixedSizeListArray};
use arrow_array::{Float32Array, Int32Array, RecordBatch, RecordBatchIterator, StringArray};
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef};
use futures::TryStreamExt;
use lance_arrow::SchemaExt;
use lance_core::cache::LanceCache;
use lance_encoding::decoder::DecoderPlugins;
use lance_file::reader::{describe_encoding, FileReader, FileReaderOptions};
use lance_file::version::LanceFileVersion;
use lance_index::scalar::inverted::{
query::PhraseQuery, tokenizer::InvertedIndexParams, SCORE_FIELD,
};
use lance_index::scalar::FullTextSearchQuery;
use lance_index::{vector::DIST_COL, DatasetIndexExt, IndexType};
use lance_io::scheduler::{ScanScheduler, SchedulerConfig};
use lance_io::utils::CachedFileSize;
use lance_linalg::distance::MetricType;
use uuid::Uuid;

use crate::dataset::scanner::{DatasetRecordBatchStream, QueryFilter};
use crate::dataset::write::WriteParams;
use crate::Dataset;
use lance_index::scalar::inverted::query::FtsQuery;
use lance_index::vector::ivf::IvfBuildParams;
Expand Down Expand Up @@ -366,6 +376,93 @@ async fn test_scan_limit_offset_preserves_json_extension_metadata() {
assert_eq!(batch_no_offset.schema(), batch_with_offset.schema());
}

#[tokio::test]
async fn test_scan_miniblock_dictionary_out_of_line_bitpacking_does_not_panic() {
let rows: usize = 10_000;
let unique_values: usize = 2_000;
let batch_size: usize = 8_192;

let mut field_meta = HashMap::new();
field_meta.insert(
"lance-encoding:structural-encoding".to_string(),
"miniblock".to_string(),
);
field_meta.insert(
"lance-encoding:dict-size-ratio".to_string(),
"0.99".to_string(),
);

let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"d",
DataType::UInt64,
false,
)
.with_metadata(field_meta)]));

let values = (0..rows)
.map(|i| (i % unique_values) as u64)
.collect::<Vec<_>>();
let batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(UInt64Array::from(values))]).unwrap();

let uri = format!("memory://{}", Uuid::new_v4());
let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone());

let write_params = WriteParams {
data_storage_version: Some(LanceFileVersion::V2_2),
..WriteParams::default()
};
let dataset = Dataset::write(reader, &uri, Some(write_params))
.await
.unwrap();

let field_id = dataset.schema().field("d").unwrap().id as u32;
let fragment = dataset.get_fragment(0).unwrap();
let data_file = fragment.data_file_for_field(field_id).unwrap();
let field_pos = data_file
.fields
.iter()
.position(|id| *id == field_id as i32)
.unwrap();
let column_idx = data_file.column_indices[field_pos] as usize;

let file_path = dataset.data_dir().child(data_file.path.as_str());
let scheduler = ScanScheduler::new(
dataset.object_store.clone(),
SchedulerConfig::max_bandwidth(&dataset.object_store),
);
let file_scheduler = scheduler
.open_file(&file_path, &CachedFileSize::unknown())
.await
.unwrap();

let cache = LanceCache::with_capacity(8 * 1024 * 1024);
let file_reader = FileReader::try_open(
file_scheduler,
None,
Arc::<DecoderPlugins>::default(),
&cache,
FileReaderOptions::default(),
)
.await
.unwrap();

let col_meta = &file_reader.metadata().column_metadatas[column_idx];
let encoding = describe_encoding(col_meta.pages.first().unwrap());
assert!(
encoding.contains("OutOfLineBitpacking") && encoding.contains("dictionary"),
"Expected a mini-block dictionary page with out-of-line bitpacking, got: {encoding}"
);

let mut scanner = dataset.scan();
scanner.batch_size(batch_size);
scanner.project(&["d"]).unwrap();

let mut stream = scanner.try_into_stream().await.unwrap();
let batch = stream.try_next().await.unwrap().unwrap();
assert_eq!(batch.num_columns(), 1);
}

async fn prepare_query_filter_dataset() -> Dataset {
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("id", DataType::Int32, false),
Expand Down
Loading