-
Couldn't load subscription status.
- Fork 1k
Add Append Column API (#4155) #4269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,7 @@ | |
| use crate::bloom_filter::Sbbf; | ||
| use crate::format as parquet; | ||
| use crate::format::{ColumnIndex, OffsetIndex, RowGroup}; | ||
| use std::io::{BufWriter, IoSlice}; | ||
| use std::io::{BufWriter, IoSlice, Read}; | ||
| use std::{io::Write, sync::Arc}; | ||
| use thrift::protocol::{TCompactOutputProtocol, TSerializable}; | ||
|
|
||
|
|
@@ -35,6 +35,7 @@ use crate::column::{ | |
| }; | ||
| use crate::data_type::DataType; | ||
| use crate::errors::{ParquetError, Result}; | ||
| use crate::file::reader::ChunkReader; | ||
| use crate::file::{ | ||
| metadata::*, properties::WriterPropertiesPtr, | ||
| statistics::to_thrift as statistics_to_thrift, PARQUET_MAGIC, | ||
|
|
@@ -423,27 +424,15 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { | |
| } | ||
| } | ||
|
|
||
| /// Returns the next column writer, if available, using the factory function; | ||
| /// otherwise returns `None`. | ||
| pub(crate) fn next_column_with_factory<'b, F, C>( | ||
| &'b mut self, | ||
| factory: F, | ||
| ) -> Result<Option<C>> | ||
| where | ||
| F: FnOnce( | ||
| ColumnDescPtr, | ||
| &'b WriterPropertiesPtr, | ||
| Box<dyn PageWriter + 'b>, | ||
| OnCloseColumnChunk<'b>, | ||
| ) -> Result<C>, | ||
| { | ||
| self.assert_previous_writer_closed()?; | ||
|
|
||
| if self.column_index >= self.descr.num_columns() { | ||
| return Ok(None); | ||
| } | ||
| let page_writer = Box::new(SerializedPageWriter::new(self.buf)); | ||
| /// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any | ||
| fn next_column_desc(&mut self) -> Option<ColumnDescPtr> { | ||
| let ret = self.descr.columns().get(self.column_index)?.clone(); | ||
| self.column_index += 1; | ||
| Some(ret) | ||
| } | ||
|
|
||
| /// Returns [`OnCloseColumnChunk`] for the next writer | ||
| fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) { | ||
| let total_bytes_written = &mut self.total_bytes_written; | ||
| let total_uncompressed_bytes = &mut self.total_uncompressed_bytes; | ||
| let total_rows_written = &mut self.total_rows_written; | ||
|
|
@@ -475,28 +464,115 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { | |
|
|
||
| Ok(()) | ||
| }; | ||
| (self.buf, Box::new(on_close)) | ||
| } | ||
|
|
||
| let column = self.descr.column(self.column_index); | ||
| self.column_index += 1; | ||
|
|
||
| Ok(Some(factory( | ||
| column, | ||
| &self.props, | ||
| page_writer, | ||
| Box::new(on_close), | ||
| )?)) | ||
| /// Returns the next column writer, if available, using the factory function; | ||
| /// otherwise returns `None`. | ||
| pub(crate) fn next_column_with_factory<'b, F, C>( | ||
| &'b mut self, | ||
| factory: F, | ||
| ) -> Result<Option<C>> | ||
| where | ||
| F: FnOnce( | ||
| ColumnDescPtr, | ||
| WriterPropertiesPtr, | ||
| Box<dyn PageWriter + 'b>, | ||
| OnCloseColumnChunk<'b>, | ||
| ) -> Result<C>, | ||
| { | ||
| self.assert_previous_writer_closed()?; | ||
| Ok(match self.next_column_desc() { | ||
| Some(column) => { | ||
| let props = self.props.clone(); | ||
| let (buf, on_close) = self.get_on_close(); | ||
| let page_writer = Box::new(SerializedPageWriter::new(buf)); | ||
| Some(factory(column, props, page_writer, Box::new(on_close))?) | ||
| } | ||
| None => None, | ||
| }) | ||
| } | ||
|
|
||
| /// Returns the next column writer, if available; otherwise returns `None`. | ||
| /// In case of any IO error or Thrift error, or if row group writer has already been | ||
| /// closed returns `Err`. | ||
| pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> { | ||
| self.next_column_with_factory(|descr, props, page_writer, on_close| { | ||
| let column_writer = get_column_writer(descr, props.clone(), page_writer); | ||
| let column_writer = get_column_writer(descr, props, page_writer); | ||
| Ok(SerializedColumnWriter::new(column_writer, Some(on_close))) | ||
| }) | ||
| } | ||
|
|
||
| /// Append an encoded column chunk from another source without decoding it | ||
| /// | ||
| /// This can be used for efficiently concatenating or projecting parquet data, | ||
| /// or encoding parquet data to temporary in-memory buffers | ||
| /// | ||
| /// See [`Self::next_column`] for writing data that isn't already encoded | ||
| pub fn append_column<R: ChunkReader>( | ||
| &mut self, | ||
| reader: &R, | ||
| mut close: ColumnCloseResult, | ||
| ) -> Result<()> { | ||
| self.assert_previous_writer_closed()?; | ||
| let desc = self.next_column_desc().ok_or_else(|| { | ||
| general_err!("exhausted columns in SerializedRowGroupWriter") | ||
| })?; | ||
|
|
||
| let metadata = close.metadata; | ||
|
|
||
| if metadata.column_descr() != desc.as_ref() { | ||
| return Err(general_err!( | ||
| "column descriptor mismatch, expected {:?} got {:?}", | ||
| desc, | ||
| metadata.column_descr() | ||
| )); | ||
| } | ||
|
|
||
| let src_dictionary_offset = metadata.dictionary_page_offset(); | ||
| let src_data_offset = metadata.data_page_offset(); | ||
| let src_offset = src_dictionary_offset.unwrap_or(src_data_offset); | ||
| let src_length = metadata.compressed_size(); | ||
|
|
||
| let write_offset = self.buf.bytes_written(); | ||
| let mut read = reader.get_read(src_offset as _)?.take(src_length as _); | ||
| let write_length = std::io::copy(&mut read, &mut self.buf)?; | ||
|
|
||
| if src_length as u64 != write_length { | ||
| return Err(general_err!( | ||
| "Failed to splice column data, expected {read_length} got {write_length}" | ||
| )); | ||
| } | ||
|
|
||
| let file_offset = self.buf.bytes_written() as i64; | ||
|
|
||
| let map_offset = |x| x - src_offset + write_offset as i64; | ||
| let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr()) | ||
| .set_compression(metadata.compression()) | ||
| .set_encodings(metadata.encodings().clone()) | ||
| .set_file_offset(file_offset) | ||
| .set_total_compressed_size(metadata.compressed_size()) | ||
| .set_total_uncompressed_size(metadata.uncompressed_size()) | ||
| .set_num_values(metadata.num_values()) | ||
| .set_data_page_offset(map_offset(src_data_offset)) | ||
| .set_dictionary_page_offset(src_dictionary_offset.map(map_offset)); | ||
|
|
||
| if let Some(statistics) = metadata.statistics() { | ||
| builder = builder.set_statistics(statistics.clone()) | ||
| } | ||
| close.metadata = builder.build()?; | ||
|
|
||
| if let Some(offsets) = close.offset_index.as_mut() { | ||
| for location in &mut offsets.page_locations { | ||
| location.offset = map_offset(location.offset) | ||
| } | ||
| } | ||
|
|
||
| SerializedPageWriter::new(self.buf).write_metadata(&metadata)?; | ||
| let (_, on_close) = self.get_on_close(); | ||
| on_close(close) | ||
| } | ||
|
|
||
| /// Closes this row group writer and returns row group metadata. | ||
| pub fn close(mut self) -> Result<RowGroupMetaDataPtr> { | ||
| if self.row_group_metadata.is_none() { | ||
|
|
@@ -516,9 +592,9 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { | |
| if let Some(on_close) = self.on_close.take() { | ||
| on_close( | ||
| metadata, | ||
| self.bloom_filters.clone(), | ||
| self.column_indexes.clone(), | ||
| self.offset_indexes.clone(), | ||
| self.bloom_filters, | ||
| self.column_indexes, | ||
| self.offset_indexes, | ||
| )? | ||
| } | ||
| } | ||
|
|
@@ -720,9 +796,11 @@ mod tests { | |
|
|
||
| use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type}; | ||
| use crate::column::page::PageReader; | ||
| use crate::column::reader::get_typed_column_reader; | ||
| use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; | ||
| use crate::data_type::{BoolType, Int32Type}; | ||
| use crate::file::reader::ChunkReader; | ||
| use crate::file::serialized_reader::ReadOptionsBuilder; | ||
| use crate::file::{ | ||
| properties::{ReaderProperties, WriterProperties, WriterVersion}, | ||
| reader::{FileReader, SerializedFileReader, SerializedPageReader}, | ||
|
|
@@ -1540,4 +1618,91 @@ mod tests { | |
| assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref())); | ||
| assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref())); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_spliced_write() { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is modeled on what #3871 will need to do within the ArrowWriter |
||
| let message_type = " | ||
| message test_schema { | ||
| REQUIRED INT32 i32 (INTEGER(32,true)); | ||
| REQUIRED INT32 u32 (INTEGER(32,false)); | ||
| } | ||
| "; | ||
| let schema = Arc::new(parse_message_type(message_type).unwrap()); | ||
| let props = Arc::new(WriterProperties::builder().build()); | ||
|
|
||
| let mut file = Vec::with_capacity(1024); | ||
| let mut file_writer = | ||
| SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap(); | ||
|
|
||
| let columns = file_writer.descr.columns(); | ||
| let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns | ||
| .iter() | ||
| .map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None)) | ||
| .collect(); | ||
|
|
||
| let mut column_state_slice = column_state.as_mut_slice(); | ||
| let mut column_writers = Vec::with_capacity(columns.len()); | ||
| for c in columns { | ||
| let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is somewhat obtuse, but is necessary to make the lifetimes work |
||
| column_state_slice = tail; | ||
|
|
||
| let page_writer = Box::new(SerializedPageWriter::new(buf)); | ||
| let col_writer = get_column_writer(c.clone(), props.clone(), page_writer); | ||
| column_writers.push(SerializedColumnWriter::new( | ||
| col_writer, | ||
| Some(Box::new(|on_close| { | ||
| *out = Some(on_close); | ||
| Ok(()) | ||
| })), | ||
| )); | ||
| } | ||
|
|
||
| let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]]; | ||
|
|
||
| // Interleaved writing to the column writers | ||
| for (writer, batch) in column_writers.iter_mut().zip(column_data) { | ||
| let writer = writer.typed::<Int32Type>(); | ||
| writer.write_batch(&batch, None, None).unwrap(); | ||
| } | ||
|
|
||
| // Close the column writers | ||
| for writer in column_writers { | ||
| writer.close().unwrap() | ||
| } | ||
|
|
||
| // Splice column data into a row group | ||
| let mut row_group_writer = file_writer.next_row_group().unwrap(); | ||
| for (write, close) in column_state { | ||
| let buf = Bytes::from(write.into_inner().unwrap()); | ||
| row_group_writer | ||
| .append_column(&buf, close.unwrap()) | ||
| .unwrap(); | ||
| } | ||
| row_group_writer.close().unwrap(); | ||
| file_writer.close().unwrap(); | ||
|
|
||
| // Check data was written correctly | ||
| let file = Bytes::from(file); | ||
| let test_read = |reader: SerializedFileReader<Bytes>| { | ||
| let row_group = reader.get_row_group(0).unwrap(); | ||
|
|
||
| let mut out = [0; 4]; | ||
| let c1 = row_group.get_column_reader(0).unwrap(); | ||
| let mut c1 = get_typed_column_reader::<Int32Type>(c1); | ||
| c1.read_batch(4, None, None, &mut out).unwrap(); | ||
| assert_eq!(out, column_data[0]); | ||
|
|
||
| let c2 = row_group.get_column_reader(1).unwrap(); | ||
| let mut c2 = get_typed_column_reader::<Int32Type>(c2); | ||
| c2.read_batch(4, None, None, &mut out).unwrap(); | ||
| assert_eq!(out, column_data[1]); | ||
| }; | ||
|
|
||
| let reader = SerializedFileReader::new(file.clone()).unwrap(); | ||
| test_read(reader); | ||
|
|
||
| let options = ReadOptionsBuilder::new().with_page_index().build(); | ||
| let reader = SerializedFileReader::new_with_options(file, options).unwrap(); | ||
| test_read(reader); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a drive-by cleanup. Given it is going to be cloned anyway, might as well just pass the cloned type (this API is crate private)