Skip to content

Commit 3142ec0

Browse files
committed
Preserve dictionary encoding from parquet (#171)
1 parent 0cc0c05 commit 3142ec0

File tree

12 files changed

+1235
-226
lines changed

12 files changed

+1235
-226
lines changed

parquet/src/arrow/array_reader.rs

Lines changed: 26 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,10 @@ use arrow::datatypes::{
5656
use arrow::util::bit_util;
5757

5858
use crate::arrow::converter::{
59-
BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter,
60-
DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
61-
Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
62-
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
63-
IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter,
59+
Converter, DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter,
60+
FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
61+
IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
62+
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
6463
};
6564
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
6665
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
@@ -70,8 +69,8 @@ use crate::column::page::PageIterator;
7069
use crate::column::reader::decoder::ColumnValueDecoder;
7170
use crate::column::reader::ColumnReaderImpl;
7271
use crate::data_type::{
73-
BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType,
74-
Int32Type, Int64Type, Int96Type,
72+
BoolType, DataType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
73+
Int64Type, Int96Type,
7574
};
7675
use crate::errors::{ParquetError, ParquetError::ArrowError, Result};
7776
use crate::file::reader::{FilePageIterator, FileReader};
@@ -81,9 +80,15 @@ use crate::schema::types::{
8180
use crate::schema::visitor::TypeVisitor;
8281

8382
mod byte_array;
83+
mod byte_array_dictionary;
84+
mod dictionary_buffer;
8485
mod offset_buffer;
8586

87+
#[cfg(test)]
88+
mod test_util;
89+
8690
pub use byte_array::make_byte_array_reader;
91+
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
8792

8893
/// Array reader reads parquet data into arrow array.
8994
pub trait ArrayReader {
@@ -271,7 +276,8 @@ where
271276
.clone(),
272277
};
273278

274-
let record_reader = RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
279+
let record_reader =
280+
RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
275281

276282
Ok(Self {
277283
data_type,
@@ -829,17 +835,18 @@ fn remove_indices(
829835
size
830836
),
831837
ArrowType::Struct(fields) => {
832-
let struct_array = arr.as_any()
838+
let struct_array = arr
839+
.as_any()
833840
.downcast_ref::<StructArray>()
834841
.expect("Array should be a struct");
835842

836843
// Recursively call remove indices on each of the structs fields
837-
let new_columns = fields.into_iter()
844+
let new_columns = fields
845+
.into_iter()
838846
.zip(struct_array.columns())
839847
.map(|(field, column)| {
840848
let dt = field.data_type().clone();
841-
Ok((field,
842-
remove_indices(column.clone(), dt, indices.clone())?))
849+
Ok((field, remove_indices(column.clone(), dt, indices.clone())?))
843850
})
844851
.collect::<Result<Vec<_>>>()?;
845852

@@ -1783,35 +1790,12 @@ impl<'a> ArrayReaderBuilder {
17831790
)?,
17841791
)),
17851792
PhysicalType::BYTE_ARRAY => match arrow_type {
1786-
// TODO: Replace with optimised dictionary reader (#171)
1787-
Some(ArrowType::Dictionary(_, _)) => {
1788-
match cur_type.get_basic_info().converted_type() {
1789-
ConvertedType::UTF8 => {
1790-
let converter = Utf8Converter::new(Utf8ArrayConverter {});
1791-
Ok(Box::new(ComplexObjectArrayReader::<
1792-
ByteArrayType,
1793-
Utf8Converter,
1794-
>::new(
1795-
page_iterator,
1796-
column_desc,
1797-
converter,
1798-
arrow_type,
1799-
)?))
1800-
}
1801-
_ => {
1802-
let converter = BinaryConverter::new(BinaryArrayConverter {});
1803-
Ok(Box::new(ComplexObjectArrayReader::<
1804-
ByteArrayType,
1805-
BinaryConverter,
1806-
>::new(
1807-
page_iterator,
1808-
column_desc,
1809-
converter,
1810-
arrow_type,
1811-
)?))
1812-
}
1813-
}
1814-
}
1793+
Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
1794+
page_iterator,
1795+
column_desc,
1796+
arrow_type,
1797+
null_mask_only,
1798+
),
18151799
_ => make_byte_array_reader(
18161800
page_iterator,
18171801
column_desc,
@@ -2025,7 +2009,7 @@ mod tests {
20252009
use crate::arrow::schema::parquet_to_arrow_schema;
20262010
use crate::basic::{Encoding, Type as PhysicalType};
20272011
use crate::column::page::{Page, PageReader};
2028-
use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type};
2012+
use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type};
20292013
use crate::errors::Result;
20302014
use crate::file::reader::{FileReader, SerializedFileReader};
20312015
use crate::schema::parser::parse_message_type;

parquet/src/arrow/array_reader/byte_array.rs

Lines changed: 7 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -579,69 +579,18 @@ impl ByteArrayDecoderDictionary {
579579
#[cfg(test)]
580580
mod tests {
581581
use super::*;
582+
use crate::arrow::array_reader::test_util::{
583+
byte_array_all_encodings, utf8_column,
584+
};
582585
use crate::arrow::record_reader::buffer::ValuesBuffer;
583-
use crate::basic::Type as PhysicalType;
584-
use crate::data_type::{ByteArray, ByteArrayType};
585-
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
586-
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
587-
use crate::util::memory::MemTracker;
588586
use arrow::array::{Array, StringArray};
589-
use std::sync::Arc;
590-
591-
fn column() -> ColumnDescPtr {
592-
let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY)
593-
.with_converted_type(ConvertedType::UTF8)
594-
.build()
595-
.unwrap();
596-
597-
Arc::new(ColumnDescriptor::new(
598-
Arc::new(t),
599-
1,
600-
0,
601-
ColumnPath::new(vec![]),
602-
))
603-
}
604-
605-
fn get_encoded(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr {
606-
let descriptor = column();
607-
let mem_tracker = Arc::new(MemTracker::new());
608-
let mut encoder =
609-
get_encoder::<ByteArrayType>(descriptor, encoding, mem_tracker).unwrap();
610-
611-
encoder.put(data).unwrap();
612-
encoder.flush_buffer().unwrap()
613-
}
614587

615588
#[test]
616589
fn test_byte_array_decoder() {
617-
let data: Vec<_> = vec!["hello", "world", "a", "b"]
618-
.into_iter()
619-
.map(ByteArray::from)
620-
.collect();
621-
622-
let mut dict_encoder =
623-
DictEncoder::<ByteArrayType>::new(column(), Arc::new(MemTracker::new()));
624-
625-
dict_encoder.put(&data).unwrap();
626-
let encoded_rle = dict_encoder.flush_buffer().unwrap();
627-
let encoded_dictionary = dict_encoder.write_dict().unwrap();
628-
629-
// A column chunk with all the encodings!
630-
let pages = vec![
631-
(Encoding::PLAIN, get_encoded(Encoding::PLAIN, &data)),
632-
(
633-
Encoding::DELTA_BYTE_ARRAY,
634-
get_encoded(Encoding::DELTA_BYTE_ARRAY, &data),
635-
),
636-
(
637-
Encoding::DELTA_LENGTH_BYTE_ARRAY,
638-
get_encoded(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data),
639-
),
640-
(Encoding::PLAIN_DICTIONARY, encoded_rle.clone()),
641-
(Encoding::RLE_DICTIONARY, encoded_rle),
642-
];
590+
let (pages, encoded_dictionary) =
591+
byte_array_all_encodings(vec!["hello", "world", "a", "b"]);
643592

644-
let column_desc = column();
593+
let column_desc = utf8_column();
645594
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
646595

647596
decoder
@@ -668,15 +617,9 @@ mod tests {
668617
assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
669618

670619
let valid = vec![false, false, true, true, false, true, true, false, false];
671-
let rev_position_iter = valid
672-
.iter()
673-
.enumerate()
674-
.rev()
675-
.filter_map(|(i, valid)| valid.then(|| i));
676-
677620
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
678621

679-
output.pad_nulls(0, 4, valid.len(), rev_position_iter);
622+
output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
680623
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
681624
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
682625

0 commit comments

Comments
 (0)