Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
&mut self,
cur_type: Arc<Type>,
context: &'a ArrayReaderBuilderContext,
) -> Result<Option<Box<ArrayReader>>> {
) -> Result<Option<Box<dyn ArrayReader>>> {
let mut new_context = context.clone();
new_context.path.append(vec![cur_type.name().to_string()]);

Expand Down
2 changes: 1 addition & 1 deletion rust/parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<T: DataType> RecordReader<T> {
}

/// Set the current page reader.
pub fn set_page_reader(&mut self, page_reader: Box<PageReader>) -> Result<()> {
pub fn set_page_reader(&mut self, page_reader: Box<dyn PageReader>) -> Result<()> {
self.column_reader =
Some(ColumnReaderImpl::new(self.column_desc.clone(), page_reader));
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion rust/parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pub trait PageWriter {
}

/// An iterator over pages of some specific column in a parquet file.
pub trait PageIterator: Iterator<Item = Result<Box<PageReader>>> {
pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> {
/// Get schema of parquet file.
fn schema(&mut self) -> Result<SchemaDescPtr>;

Expand Down
8 changes: 4 additions & 4 deletions rust/parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub enum ColumnReader {
/// column reader will read from pages in `col_page_reader`.
pub fn get_column_reader(
col_descr: ColumnDescPtr,
col_page_reader: Box<PageReader>,
col_page_reader: Box<dyn PageReader>,
) -> ColumnReader {
match col_descr.physical_type() {
Type::BOOLEAN => ColumnReader::BoolColumnReader(ColumnReaderImpl::new(
Expand Down Expand Up @@ -106,7 +106,7 @@ pub struct ColumnReaderImpl<T: DataType> {
descr: ColumnDescPtr,
def_level_decoder: Option<LevelDecoder>,
rep_level_decoder: Option<LevelDecoder>,
page_reader: Box<PageReader>,
page_reader: Box<dyn PageReader>,
current_encoding: Option<Encoding>,

// The total number of values stored in the data page.
Expand All @@ -117,12 +117,12 @@ pub struct ColumnReaderImpl<T: DataType> {
num_decoded_values: u32,

// Cache of decoders for existing encodings
decoders: HashMap<Encoding, Box<Decoder<T>>>,
decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
}

impl<T: DataType> ColumnReaderImpl<T> {
/// Creates new column reader based on column descriptor and page reader.
pub fn new(descr: ColumnDescPtr, page_reader: Box<PageReader>) -> Self {
pub fn new(descr: ColumnDescPtr, page_reader: Box<dyn PageReader>) -> Self {
Self {
descr,
def_level_decoder: None,
Expand Down
18 changes: 9 additions & 9 deletions rust/parquet/src/column/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ macro_rules! gen_stats_section {
pub fn get_column_writer(
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
page_writer: Box<PageWriter>,
page_writer: Box<dyn PageWriter>,
) -> ColumnWriter {
match descr.physical_type() {
Type::BOOLEAN => ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(
Expand Down Expand Up @@ -166,12 +166,12 @@ pub struct ColumnWriterImpl<T: DataType> {
// Column writer properties
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
page_writer: Box<PageWriter>,
page_writer: Box<dyn PageWriter>,
has_dictionary: bool,
dict_encoder: Option<DictEncoder<T>>,
encoder: Box<Encoder<T>>,
encoder: Box<dyn Encoder<T>>,
codec: Compression,
compressor: Option<Box<Codec>>,
compressor: Option<Box<dyn Codec>>,
// Metrics per page
num_buffered_values: u32,
num_buffered_encoded_values: u32,
Expand Down Expand Up @@ -203,7 +203,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
pub fn new(
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
page_writer: Box<PageWriter>,
page_writer: Box<dyn PageWriter>,
) -> Self {
let codec = props.compression(descr.path());
let compressor = create_codec(codec).unwrap();
Expand Down Expand Up @@ -879,7 +879,7 @@ impl<T: DataType> ColumnWriterImpl<T> {

/// Returns reference to the underlying page writer.
/// This method is intended to use in tests only.
fn get_page_writer_ref(&self) -> &Box<PageWriter> {
fn get_page_writer_ref(&self) -> &Box<dyn PageWriter> {
&self.page_writer
}

Expand Down Expand Up @@ -1842,7 +1842,7 @@ mod tests {

/// Returns column writer.
fn get_test_column_writer<T: DataType>(
page_writer: Box<PageWriter>,
page_writer: Box<dyn PageWriter>,
max_def_level: i16,
max_rep_level: i16,
props: WriterPropertiesPtr,
Expand All @@ -1854,7 +1854,7 @@ mod tests {

/// Returns column reader.
fn get_test_column_reader<T: DataType>(
page_reader: Box<PageReader>,
page_reader: Box<dyn PageReader>,
max_def_level: i16,
max_rep_level: i16,
) -> ColumnReaderImpl<T> {
Expand All @@ -1879,7 +1879,7 @@ mod tests {
}

/// Returns page writer that collects pages without serializing them.
fn get_test_page_writer() -> Box<PageWriter> {
fn get_test_page_writer() -> Box<dyn PageWriter> {
Box::new(TestPageWriter {})
}

Expand Down
2 changes: 1 addition & 1 deletion rust/parquet/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub trait Codec {
/// Given the compression type `codec`, returns a codec used to compress and decompress
/// bytes for the compression type.
/// This returns `None` if the codec type is `UNCOMPRESSED`.
pub fn create_codec(codec: CodecType) -> Result<Option<Box<Codec>>> {
pub fn create_codec(codec: CodecType) -> Result<Option<Box<dyn Codec>>> {
match codec {
#[cfg(any(feature = "brotli", test))]
CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new()))),
Expand Down
6 changes: 3 additions & 3 deletions rust/parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ pub trait Decoder<T: DataType> {
pub fn get_decoder<T: DataType>(
descr: ColumnDescPtr,
encoding: Encoding,
) -> Result<Box<Decoder<T>>> {
let decoder: Box<Decoder<T>> = match encoding {
) -> Result<Box<dyn Decoder<T>>> {
let decoder: Box<dyn Decoder<T>> = match encoding {
Encoding::PLAIN => Box::new(PlainDecoder::new(descr.type_length())),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
return Err(general_err!(
Expand Down Expand Up @@ -231,7 +231,7 @@ impl<T: DataType> DictDecoder<T> {
}

/// Decodes and sets values for dictionary using `decoder` decoder.
pub fn set_dict(&mut self, mut decoder: Box<Decoder<T>>) -> Result<()> {
pub fn set_dict(&mut self, mut decoder: Box<dyn Decoder<T>>) -> Result<()> {
let num_values = decoder.values_left();
self.dictionary.resize(num_values, T::T::default());
let _ = decoder.get(&mut self.dictionary)?;
Expand Down
18 changes: 12 additions & 6 deletions rust/parquet/src/encodings/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ pub fn get_encoder<T: DataType>(
desc: ColumnDescPtr,
encoding: Encoding,
mem_tracker: MemTrackerPtr,
) -> Result<Box<Encoder<T>>> {
let encoder: Box<Encoder<T>> = match encoding {
) -> Result<Box<dyn Encoder<T>>> {
let encoder: Box<dyn Encoder<T>> = match encoding {
Encoding::PLAIN => Box::new(PlainEncoder::new(desc, mem_tracker, vec![])),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
return Err(general_err!(
Expand Down Expand Up @@ -1261,8 +1261,8 @@ mod tests {
}

fn put_and_get<T: DataType>(
encoder: &mut Box<Encoder<T>>,
decoder: &mut Box<Decoder<T>>,
encoder: &mut Box<dyn Encoder<T>>,
decoder: &mut Box<dyn Decoder<T>>,
input: &[T::T],
output: &mut [T::T],
) -> Result<usize> {
Expand Down Expand Up @@ -1305,13 +1305,19 @@ mod tests {
))
}

fn create_test_encoder<T: DataType>(type_len: i32, enc: Encoding) -> Box<Encoder<T>> {
fn create_test_encoder<T: DataType>(
type_len: i32,
enc: Encoding,
) -> Box<dyn Encoder<T>> {
let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
let mem_tracker = Arc::new(MemTracker::new());
get_encoder(desc, enc, mem_tracker).unwrap()
}

fn create_test_decoder<T: DataType>(type_len: i32, enc: Encoding) -> Box<Decoder<T>> {
fn create_test_decoder<T: DataType>(
type_len: i32,
enc: Encoding,
) -> Box<dyn Decoder<T>> {
let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
get_decoder(desc, enc).unwrap()
}
Expand Down
2 changes: 1 addition & 1 deletion rust/parquet/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl std::fmt::Display for ParquetError {
}

impl std::error::Error for ParquetError {
fn cause(&self) -> Option<&::std::error::Error> {
fn cause(&self) -> Option<&dyn ::std::error::Error> {
None
}
}
Expand Down
18 changes: 9 additions & 9 deletions rust/parquet/src/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub trait FileReader {
fn num_row_groups(&self) -> usize;

/// Get the `i`th row group reader. Note this doesn't do bound check.
fn get_row_group(&self, i: usize) -> Result<Box<RowGroupReader + '_>>;
fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>>;

/// Get full iterator of `Row`s from a file (over all row groups).
///
Expand All @@ -84,7 +84,7 @@ pub trait RowGroupReader {
fn num_columns(&self) -> usize;

/// Get page reader for the `i`th column chunk.
fn get_column_page_reader(&self, i: usize) -> Result<Box<PageReader>>;
fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>>;

/// Get value reader for the `i`th column chunk.
fn get_column_reader(&self, i: usize) -> Result<ColumnReader> {
Expand Down Expand Up @@ -139,13 +139,13 @@ pub trait RowGroupReader {
/// Implementation of page iterator for parquet file.
pub struct FilePageIterator {
column_index: usize,
row_group_indices: Box<Iterator<Item = usize>>,
file_reader: Arc<FileReader>,
row_group_indices: Box<dyn Iterator<Item = usize>>,
file_reader: Arc<dyn FileReader>,
}

impl FilePageIterator {
/// Creates a page iterator for all row groups in file.
pub fn new(column_index: usize, file_reader: Arc<FileReader>) -> Result<Self> {
pub fn new(column_index: usize, file_reader: Arc<dyn FileReader>) -> Result<Self> {
let num_row_groups = file_reader.metadata().num_row_groups();

let row_group_indices = Box::new(0..num_row_groups);
Expand All @@ -156,8 +156,8 @@ impl FilePageIterator {
/// Create page iterator from parquet file reader with only some row groups.
pub fn with_row_groups(
column_index: usize,
row_group_indices: Box<Iterator<Item = usize>>,
file_reader: Arc<FileReader>,
row_group_indices: Box<dyn Iterator<Item = usize>>,
file_reader: Arc<dyn FileReader>,
) -> Result<Self> {
// Check that column_index is valid
let num_columns = file_reader
Expand All @@ -180,9 +180,9 @@ impl FilePageIterator {
}

impl Iterator for FilePageIterator {
type Item = Result<Box<PageReader>>;
type Item = Result<Box<dyn PageReader>>;

fn next(&mut self) -> Option<Result<Box<PageReader>>> {
fn next(&mut self) -> Option<Result<Box<dyn PageReader>>> {
self.row_group_indices.next().map(|row_group_index| {
self.file_reader
.get_row_group(row_group_index)
Expand Down
14 changes: 7 additions & 7 deletions rust/parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
self.metadata.num_row_groups()
}

fn get_row_group(&self, i: usize) -> Result<Box<RowGroupReader + '_>> {
fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
let row_group_metadata = self.metadata.row_group(i);
// Row groups should be processed sequentially.
let f = Arc::clone(&self.chunk_reader);
Expand Down Expand Up @@ -207,7 +207,7 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
}

// TODO: fix PARQUET-816
fn get_column_page_reader(&self, i: usize) -> Result<Box<PageReader>> {
fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
let col = self.metadata.column(i);
let (col_start, col_length) = col.byte_range();
let file_chunk = self.chunk_reader.get_read(col_start, col_length as usize)?;
Expand All @@ -232,7 +232,7 @@ pub struct SerializedPageReader<T: Read> {
buf: T,

// The compression codec for this column chunk. Only set for non-PLAIN codec.
decompressor: Option<Box<Codec>>,
decompressor: Option<Box<dyn Codec>>,

// The number of values we have seen so far.
seen_num_values: i64,
Expand Down Expand Up @@ -544,7 +544,7 @@ mod tests {
// Test row group reader
let row_group_reader_result = reader.get_row_group(0);
assert!(row_group_reader_result.is_ok());
let row_group_reader: Box<RowGroupReader> = row_group_reader_result.unwrap();
let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
assert_eq!(
row_group_reader.num_columns(),
row_group_metadata.num_columns()
Expand All @@ -558,7 +558,7 @@ mod tests {
// TODO: test for every column
let page_reader_0_result = row_group_reader.get_column_page_reader(0);
assert!(page_reader_0_result.is_ok());
let mut page_reader_0: Box<PageReader> = page_reader_0_result.unwrap();
let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
let mut page_count = 0;
while let Ok(Some(page)) = page_reader_0.get_next_page() {
let is_expected_page = match page {
Expand Down Expand Up @@ -636,7 +636,7 @@ mod tests {
// Test row group reader
let row_group_reader_result = reader.get_row_group(0);
assert!(row_group_reader_result.is_ok());
let row_group_reader: Box<RowGroupReader> = row_group_reader_result.unwrap();
let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
assert_eq!(
row_group_reader.num_columns(),
row_group_metadata.num_columns()
Expand All @@ -650,7 +650,7 @@ mod tests {
// TODO: test for every column
let page_reader_0_result = row_group_reader.get_column_page_reader(0);
assert!(page_reader_0_result.is_ok());
let mut page_reader_0: Box<PageReader> = page_reader_0_result.unwrap();
let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
let mut page_count = 0;
while let Ok(Some(page)) = page_reader_0.get_next_page() {
let is_expected_page = match page {
Expand Down
18 changes: 12 additions & 6 deletions rust/parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,14 @@ pub trait FileWriter {
/// There is no limit on a number of row groups in a file; however, row groups have
/// to be written sequentially. Every time the next row group is requested, the
/// previous row group must be finalised and closed using `close_row_group` method.
fn next_row_group(&mut self) -> Result<Box<RowGroupWriter>>;
fn next_row_group(&mut self) -> Result<Box<dyn RowGroupWriter>>;

/// Finalises and closes row group that was created using `next_row_group` method.
/// After calling this method, the next row group is available for writes.
fn close_row_group(&mut self, row_group_writer: Box<RowGroupWriter>) -> Result<()>;
fn close_row_group(
&mut self,
row_group_writer: Box<dyn RowGroupWriter>,
) -> Result<()>;

/// Closes and finalises file writer.
///
Expand Down Expand Up @@ -165,7 +168,7 @@ impl<W: ParquetWriter> SerializedFileWriter<W> {
/// Finalises active row group writer, otherwise no-op.
fn finalise_row_group_writer(
&mut self,
mut row_group_writer: Box<RowGroupWriter>,
mut row_group_writer: Box<dyn RowGroupWriter>,
) -> Result<()> {
let row_group_metadata = row_group_writer.close()?;
self.total_num_rows += row_group_metadata.num_rows();
Expand Down Expand Up @@ -229,7 +232,7 @@ impl<W: ParquetWriter> SerializedFileWriter<W> {

impl<W: 'static + ParquetWriter> FileWriter for SerializedFileWriter<W> {
#[inline]
fn next_row_group(&mut self) -> Result<Box<RowGroupWriter>> {
fn next_row_group(&mut self) -> Result<Box<dyn RowGroupWriter>> {
self.assert_closed()?;
self.assert_previous_writer_closed()?;
let row_group_writer = SerializedRowGroupWriter::new(
Expand All @@ -242,7 +245,10 @@ impl<W: 'static + ParquetWriter> FileWriter for SerializedFileWriter<W> {
}

#[inline]
fn close_row_group(&mut self, row_group_writer: Box<RowGroupWriter>) -> Result<()> {
fn close_row_group(
&mut self,
row_group_writer: Box<dyn RowGroupWriter>,
) -> Result<()> {
self.assert_closed()?;
let res = self.finalise_row_group_writer(row_group_writer);
self.previous_writer_closed = res.is_ok();
Expand Down Expand Up @@ -993,7 +999,7 @@ mod tests {
}

/// Helper function to compress a slice
fn compress_helper(compressor: Option<&mut Box<Codec>>, data: &[u8]) -> Vec<u8> {
fn compress_helper(compressor: Option<&mut Box<dyn Codec>>, data: &[u8]) -> Vec<u8> {
let mut output_buf = vec![];
if let Some(cmpr) = compressor {
cmpr.compress(data, &mut output_buf).unwrap();
Expand Down
1 change: 0 additions & 1 deletion rust/parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#![allow(incomplete_features)]
#![allow(dead_code)]
#![allow(non_camel_case_types)]
#![allow(bare_trait_objects)]
#![allow(
clippy::approx_constant,
clippy::borrowed_box,
Expand Down
Loading