Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ impl<'a> ByteArrayWriter<'a> {
/// Returns a new [`ByteArrayWriter`]
pub fn new(
descr: ColumnDescPtr,
props: &'a WriterPropertiesPtr,
props: WriterPropertiesPtr,
Copy link
Contributor Author

@tustvold tustvold May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a drive-by cleanup. Given it is going to be cloned anyway, might as well just pass the cloned type (this API is crate private)

page_writer: Box<dyn PageWriter + 'a>,
on_close: OnCloseColumnChunk<'a>,
) -> Result<Self> {
Ok(Self {
writer: GenericColumnWriter::new(descr, props.clone(), page_writer),
writer: GenericColumnWriter::new(descr, props, page_writer),
on_close: Some(on_close),
})
}
Expand Down
233 changes: 199 additions & 34 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use crate::bloom_filter::Sbbf;
use crate::format as parquet;
use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
use std::io::{BufWriter, IoSlice};
use std::io::{BufWriter, IoSlice, Read};
use std::{io::Write, sync::Arc};
use thrift::protocol::{TCompactOutputProtocol, TSerializable};

Expand All @@ -35,6 +35,7 @@ use crate::column::{
};
use crate::data_type::DataType;
use crate::errors::{ParquetError, Result};
use crate::file::reader::ChunkReader;
use crate::file::{
metadata::*, properties::WriterPropertiesPtr,
statistics::to_thrift as statistics_to_thrift, PARQUET_MAGIC,
Expand Down Expand Up @@ -423,27 +424,15 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
}
}

/// Returns the next column writer, if available, using the factory function;
/// otherwise returns `None`.
pub(crate) fn next_column_with_factory<'b, F, C>(
&'b mut self,
factory: F,
) -> Result<Option<C>>
where
F: FnOnce(
ColumnDescPtr,
&'b WriterPropertiesPtr,
Box<dyn PageWriter + 'b>,
OnCloseColumnChunk<'b>,
) -> Result<C>,
{
self.assert_previous_writer_closed()?;

if self.column_index >= self.descr.num_columns() {
return Ok(None);
}
let page_writer = Box::new(SerializedPageWriter::new(self.buf));
/// Advance `self.column_index` returning the next [`ColumnDescPtr`] if any
fn next_column_desc(&mut self) -> Option<ColumnDescPtr> {
let ret = self.descr.columns().get(self.column_index)?.clone();
self.column_index += 1;
Some(ret)
}

/// Returns [`OnCloseColumnChunk`] for the next writer
fn get_on_close(&mut self) -> (&mut TrackedWrite<W>, OnCloseColumnChunk<'_>) {
let total_bytes_written = &mut self.total_bytes_written;
let total_uncompressed_bytes = &mut self.total_uncompressed_bytes;
let total_rows_written = &mut self.total_rows_written;
Expand Down Expand Up @@ -475,28 +464,115 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {

Ok(())
};
(self.buf, Box::new(on_close))
}

let column = self.descr.column(self.column_index);
self.column_index += 1;

Ok(Some(factory(
column,
&self.props,
page_writer,
Box::new(on_close),
)?))
/// Returns the next column writer, if available, using the factory function;
/// otherwise returns `None`.
pub(crate) fn next_column_with_factory<'b, F, C>(
&'b mut self,
factory: F,
) -> Result<Option<C>>
where
F: FnOnce(
ColumnDescPtr,
WriterPropertiesPtr,
Box<dyn PageWriter + 'b>,
OnCloseColumnChunk<'b>,
) -> Result<C>,
{
self.assert_previous_writer_closed()?;
Ok(match self.next_column_desc() {
Some(column) => {
let props = self.props.clone();
let (buf, on_close) = self.get_on_close();
let page_writer = Box::new(SerializedPageWriter::new(buf));
Some(factory(column, props, page_writer, Box::new(on_close))?)
}
None => None,
})
}

/// Returns the next column writer, if available; otherwise returns `None`.
/// In case of any IO error or Thrift error, or if row group writer has already been
/// closed returns `Err`.
pub fn next_column(&mut self) -> Result<Option<SerializedColumnWriter<'_>>> {
self.next_column_with_factory(|descr, props, page_writer, on_close| {
let column_writer = get_column_writer(descr, props.clone(), page_writer);
let column_writer = get_column_writer(descr, props, page_writer);
Ok(SerializedColumnWriter::new(column_writer, Some(on_close)))
})
}

/// Append an encoded column chunk from another source without decoding it
///
/// This can be used for efficiently concatenating or projecting parquet data,
/// or encoding parquet data to temporary in-memory buffers
///
/// See [`Self::next_column`] for writing data that isn't already encoded
pub fn append_column<R: ChunkReader>(
&mut self,
reader: &R,
mut close: ColumnCloseResult,
) -> Result<()> {
self.assert_previous_writer_closed()?;
let desc = self.next_column_desc().ok_or_else(|| {
general_err!("exhausted columns in SerializedRowGroupWriter")
})?;

let metadata = close.metadata;

if metadata.column_descr() != desc.as_ref() {
return Err(general_err!(
"column descriptor mismatch, expected {:?} got {:?}",
desc,
metadata.column_descr()
));
}

let src_dictionary_offset = metadata.dictionary_page_offset();
let src_data_offset = metadata.data_page_offset();
let src_offset = src_dictionary_offset.unwrap_or(src_data_offset);
let src_length = metadata.compressed_size();

let write_offset = self.buf.bytes_written();
let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
let write_length = std::io::copy(&mut read, &mut self.buf)?;

if src_length as u64 != write_length {
return Err(general_err!(
"Failed to splice column data, expected {read_length} got {write_length}"
));
}

let file_offset = self.buf.bytes_written() as i64;

let map_offset = |x| x - src_offset + write_offset as i64;
let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
.set_compression(metadata.compression())
.set_encodings(metadata.encodings().clone())
.set_file_offset(file_offset)
.set_total_compressed_size(metadata.compressed_size())
.set_total_uncompressed_size(metadata.uncompressed_size())
.set_num_values(metadata.num_values())
.set_data_page_offset(map_offset(src_data_offset))
.set_dictionary_page_offset(src_dictionary_offset.map(map_offset));

if let Some(statistics) = metadata.statistics() {
builder = builder.set_statistics(statistics.clone())
}
close.metadata = builder.build()?;

if let Some(offsets) = close.offset_index.as_mut() {
for location in &mut offsets.page_locations {
location.offset = map_offset(location.offset)
}
}

SerializedPageWriter::new(self.buf).write_metadata(&metadata)?;
let (_, on_close) = self.get_on_close();
on_close(close)
}

/// Closes this row group writer and returns row group metadata.
pub fn close(mut self) -> Result<RowGroupMetaDataPtr> {
if self.row_group_metadata.is_none() {
Expand All @@ -516,9 +592,9 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
if let Some(on_close) = self.on_close.take() {
on_close(
metadata,
self.bloom_filters.clone(),
self.column_indexes.clone(),
self.offset_indexes.clone(),
self.bloom_filters,
self.column_indexes,
self.offset_indexes,
)?
}
}
Expand Down Expand Up @@ -720,9 +796,11 @@ mod tests {

use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type};
use crate::column::page::PageReader;
use crate::column::reader::get_typed_column_reader;
use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
use crate::data_type::{BoolType, Int32Type};
use crate::file::reader::ChunkReader;
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
properties::{ReaderProperties, WriterProperties, WriterVersion},
reader::{FileReader, SerializedFileReader, SerializedPageReader},
Expand Down Expand Up @@ -1540,4 +1618,91 @@ mod tests {
assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
}

#[test]
fn test_spliced_write() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is modeled on what #3871 will need to do within the ArrowWriter

let message_type = "
message test_schema {
REQUIRED INT32 i32 (INTEGER(32,true));
REQUIRED INT32 u32 (INTEGER(32,false));
}
";
let schema = Arc::new(parse_message_type(message_type).unwrap());
let props = Arc::new(WriterProperties::builder().build());

let mut file = Vec::with_capacity(1024);
let mut file_writer =
SerializedFileWriter::new(&mut file, schema, props.clone()).unwrap();

let columns = file_writer.descr.columns();
let mut column_state: Vec<(_, Option<ColumnCloseResult>)> = columns
.iter()
.map(|_| (TrackedWrite::new(Vec::with_capacity(1024)), None))
.collect();

let mut column_state_slice = column_state.as_mut_slice();
let mut column_writers = Vec::with_capacity(columns.len());
for c in columns {
let ((buf, out), tail) = column_state_slice.split_first_mut().unwrap();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat obtuse, but is necessary to make the lifetimes work

column_state_slice = tail;

let page_writer = Box::new(SerializedPageWriter::new(buf));
let col_writer = get_column_writer(c.clone(), props.clone(), page_writer);
column_writers.push(SerializedColumnWriter::new(
col_writer,
Some(Box::new(|on_close| {
*out = Some(on_close);
Ok(())
})),
));
}

let column_data = [[1, 2, 3, 4], [7, 3, 7, 3]];

// Interleaved writing to the column writers
for (writer, batch) in column_writers.iter_mut().zip(column_data) {
let writer = writer.typed::<Int32Type>();
writer.write_batch(&batch, None, None).unwrap();
}

// Close the column writers
for writer in column_writers {
writer.close().unwrap()
}

// Splice column data into a row group
let mut row_group_writer = file_writer.next_row_group().unwrap();
for (write, close) in column_state {
let buf = Bytes::from(write.into_inner().unwrap());
row_group_writer
.append_column(&buf, close.unwrap())
.unwrap();
}
row_group_writer.close().unwrap();
file_writer.close().unwrap();

// Check data was written correctly
let file = Bytes::from(file);
let test_read = |reader: SerializedFileReader<Bytes>| {
let row_group = reader.get_row_group(0).unwrap();

let mut out = [0; 4];
let c1 = row_group.get_column_reader(0).unwrap();
let mut c1 = get_typed_column_reader::<Int32Type>(c1);
c1.read_batch(4, None, None, &mut out).unwrap();
assert_eq!(out, column_data[0]);

let c2 = row_group.get_column_reader(1).unwrap();
let mut c2 = get_typed_column_reader::<Int32Type>(c2);
c2.read_batch(4, None, None, &mut out).unwrap();
assert_eq!(out, column_data[1]);
};

let reader = SerializedFileReader::new(file.clone()).unwrap();
test_read(reader);

let options = ReadOptionsBuilder::new().with_page_index().build();
let reader = SerializedFileReader::new_with_options(file, options).unwrap();
test_read(reader);
}
}