Skip to content

Commit 1eb62bd

Browse files
authored
Unify API for writing column chunks / row groups in parallel (#8582)
# Which issue does this PR close? - Closes #8389. # Rationale for this change Simplify API surface and only provide one way to write column chunks and row groups in parallel. # What changes are included in this PR? * Make `ArrowRowGroupWriterFactory` constructor public and simplify it to remove arguments that are available from the `SerializedFileWriter`. * Update `ArrowColumnWriter` example and test code to use the `ArrowRowGroupWriterFactory`. * Deprecate `parquet::arrow::arrow_writer::get_column_writers` and `parquet::arrow::arrow_writer::ArrowWriter::into_serialized_writer` # Are these changes tested? Yes, covered by existing tests. # Are there any user-facing changes? Yes, this deprecates existing public methods.
1 parent c94698c commit 1eb62bd

File tree

4 files changed

+96
-90
lines changed

4 files changed

+96
-90
lines changed

parquet/benches/arrow_writer.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
extern crate criterion;
2020

2121
use criterion::{Bencher, Criterion, Throughput};
22-
use parquet::arrow::arrow_writer::compute_leaves;
22+
use parquet::arrow::arrow_writer::{ArrowRowGroupWriterFactory, compute_leaves};
2323
use parquet::basic::{Compression, ZstdLevel};
2424

2525
extern crate arrow;
@@ -33,8 +33,10 @@ use arrow::datatypes::*;
3333
use arrow::util::bench_util::{create_f16_array, create_f32_array, create_f64_array};
3434
use arrow::{record_batch::RecordBatch, util::data_gen::*};
3535
use arrow_array::RecordBatchOptions;
36+
use parquet::arrow::ArrowSchemaConverter;
37+
use parquet::errors::Result;
3638
use parquet::file::properties::{WriterProperties, WriterVersion};
37-
use parquet::{arrow::ArrowWriter, errors::Result};
39+
use parquet::file::writer::SerializedFileWriter;
3840

3941
fn create_primitive_bench_batch(
4042
size: usize,
@@ -341,8 +343,12 @@ fn write_batch_with_option(
341343
props: Option<WriterProperties>,
342344
) -> Result<()> {
343345
let mut file = Empty::default();
344-
let writer = ArrowWriter::try_new(&mut file, batch.schema(), props)?;
345-
let (_, row_group_writer_factory) = writer.into_serialized_writer()?;
346+
let props = Arc::new(props.unwrap_or_default());
347+
let parquet_schema = ArrowSchemaConverter::new()
348+
.with_coerce_types(props.coerce_types())
349+
.convert(batch.schema_ref())?;
350+
let writer = SerializedFileWriter::new(&mut file, parquet_schema.root_schema_ptr(), props)?;
351+
let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&writer, batch.schema());
346352

347353
bench.iter(|| {
348354
let mut row_group = row_group_writer_factory.create_column_writers(0).unwrap();

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 58 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
4848
use crate::file::reader::{ChunkReader, Length};
4949
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
5050
use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
51-
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
51+
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
5252
use levels::{ArrayLevels, calculate_array_levels};
5353

5454
mod byte_array;
@@ -252,7 +252,7 @@ impl<W: Write + Send> ArrowWriter<W> {
252252
SerializedFileWriter::new(writer, schema.root_schema_ptr(), Arc::clone(&props_ptr))?;
253253

254254
let row_group_writer_factory =
255-
ArrowRowGroupWriterFactory::new(&file_writer, schema, arrow_schema.clone(), props_ptr);
255+
ArrowRowGroupWriterFactory::new(&file_writer, arrow_schema.clone());
256256

257257
Ok(Self {
258258
writer: file_writer,
@@ -423,7 +423,10 @@ impl<W: Write + Send> ArrowWriter<W> {
423423
}
424424

425425
/// Create a new row group writer and return its column writers.
426-
#[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")]
426+
#[deprecated(
427+
since = "56.2.0",
428+
note = "Use `ArrowRowGroupWriterFactory` instead, see `ArrowColumnWriter` for an example"
429+
)]
427430
pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
428431
self.flush()?;
429432
let in_progress = self
@@ -433,7 +436,10 @@ impl<W: Write + Send> ArrowWriter<W> {
433436
}
434437

435438
/// Append the given column chunks to the file as a new row group.
436-
#[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")]
439+
#[deprecated(
440+
since = "56.2.0",
441+
note = "Use `SerializedFileWriter` directly instead, see `ArrowColumnWriter` for an example"
442+
)]
437443
pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
438444
let mut row_group_writer = self.writer.next_row_group()?;
439445
for chunk in chunks {
@@ -445,6 +451,10 @@ impl<W: Write + Send> ArrowWriter<W> {
445451

446452
/// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`].
447453
/// This can be useful to provide more control over how files are written.
454+
#[deprecated(
455+
since = "57.0.0",
456+
note = "Construct a `SerializedFileWriter` and `ArrowRowGroupWriterFactory` directly instead"
457+
)]
448458
pub fn into_serialized_writer(
449459
mut self,
450460
) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
@@ -693,6 +703,8 @@ impl ArrowColumnChunk {
693703

694704
/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
695705
///
706+
/// `ArrowColumnWriter` instances can be created using an [`ArrowRowGroupWriterFactory`];
707+
///
696708
/// Note: This is a low-level interface for applications that require
697709
/// fine-grained control of encoding (e.g. encoding using multiple threads),
698710
/// see [`ArrowWriter`] for a higher-level interface
@@ -704,7 +716,7 @@ impl ArrowColumnChunk {
704716
/// # use arrow_array::*;
705717
/// # use arrow_schema::*;
706718
/// # use parquet::arrow::ArrowSchemaConverter;
707-
/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers, ArrowColumnChunk};
719+
/// # use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory};
708720
/// # use parquet::file::properties::WriterProperties;
709721
/// # use parquet::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
710722
/// #
@@ -720,8 +732,17 @@ impl ArrowColumnChunk {
720732
/// .convert(&schema)
721733
/// .unwrap();
722734
///
723-
/// // Create writers for each of the leaf columns
724-
/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
735+
/// // Create parquet writer
736+
/// let root_schema = parquet_schema.root_schema_ptr();
737+
/// // write to memory in the example, but this could be a File
738+
/// let mut out = Vec::with_capacity(1024);
739+
/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
740+
/// .unwrap();
741+
///
742+
/// // Create a factory for building Arrow column writers
743+
/// let row_group_factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
744+
/// // Create column writers for the 0th row group
745+
/// let col_writers = row_group_factory.create_column_writers(0).unwrap();
725746
///
726747
/// // Spawn a worker thread for each column
727748
/// //
@@ -744,13 +765,6 @@ impl ArrowColumnChunk {
744765
/// })
745766
/// .collect();
746767
///
747-
/// // Create parquet writer
748-
/// let root_schema = parquet_schema.root_schema_ptr();
749-
/// // write to memory in the example, but this could be a File
750-
/// let mut out = Vec::with_capacity(1024);
751-
/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone())
752-
/// .unwrap();
753-
///
754768
/// // Start row group
755769
/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
756770
/// .next_row_group()
@@ -894,69 +908,65 @@ impl ArrowRowGroupWriter {
894908

895909
/// Factory that creates new column writers for each row group in the Parquet file.
896910
pub struct ArrowRowGroupWriterFactory {
897-
schema: SchemaDescriptor,
911+
schema: SchemaDescPtr,
898912
arrow_schema: SchemaRef,
899913
props: WriterPropertiesPtr,
900914
#[cfg(feature = "encryption")]
901915
file_encryptor: Option<Arc<FileEncryptor>>,
902916
}
903917

904918
impl ArrowRowGroupWriterFactory {
905-
#[cfg(feature = "encryption")]
906-
fn new<W: Write + Send>(
919+
/// Create a new [`ArrowRowGroupWriterFactory`] for the provided file writer and Arrow schema
920+
pub fn new<W: Write + Send>(
907921
file_writer: &SerializedFileWriter<W>,
908-
schema: SchemaDescriptor,
909922
arrow_schema: SchemaRef,
910-
props: WriterPropertiesPtr,
911923
) -> Self {
924+
let schema = Arc::clone(file_writer.schema_descr_ptr());
925+
let props = Arc::clone(file_writer.properties());
912926
Self {
913927
schema,
914928
arrow_schema,
915929
props,
930+
#[cfg(feature = "encryption")]
916931
file_encryptor: file_writer.file_encryptor(),
917932
}
918933
}
919934

920-
#[cfg(not(feature = "encryption"))]
921-
fn new<W: Write + Send>(
922-
_file_writer: &SerializedFileWriter<W>,
923-
schema: SchemaDescriptor,
924-
arrow_schema: SchemaRef,
925-
props: WriterPropertiesPtr,
926-
) -> Self {
927-
Self {
928-
schema,
929-
arrow_schema,
930-
props,
935+
fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
936+
let writers = self.create_column_writers(row_group_index)?;
937+
Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
938+
}
939+
940+
/// Create column writers for a new row group.
941+
pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
942+
let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
943+
let mut leaves = self.schema.columns().iter();
944+
let column_factory = self.column_writer_factory(row_group_index);
945+
for field in &self.arrow_schema.fields {
946+
column_factory.get_arrow_column_writer(
947+
field.data_type(),
948+
&self.props,
949+
&mut leaves,
950+
&mut writers,
951+
)?;
931952
}
953+
Ok(writers)
932954
}
933955

934956
#[cfg(feature = "encryption")]
935-
fn create_row_group_writer(&self, row_group_index: usize) -> Result<ArrowRowGroupWriter> {
936-
let writers = get_column_writers_with_encryptor(
937-
&self.schema,
938-
&self.props,
939-
&self.arrow_schema,
940-
self.file_encryptor.clone(),
941-
row_group_index,
942-
)?;
943-
Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
957+
fn column_writer_factory(&self, row_group_idx: usize) -> ArrowColumnWriterFactory {
958+
ArrowColumnWriterFactory::new()
959+
.with_file_encryptor(row_group_idx, self.file_encryptor.clone())
944960
}
945961

946962
#[cfg(not(feature = "encryption"))]
947-
fn create_row_group_writer(&self, _row_group_index: usize) -> Result<ArrowRowGroupWriter> {
948-
let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?;
949-
Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
950-
}
951-
952-
/// Create column writers for a new row group.
953-
pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
954-
let rg_writer = self.create_row_group_writer(row_group_index)?;
955-
Ok(rg_writer.writers)
963+
fn column_writer_factory(&self, _row_group_idx: usize) -> ArrowColumnWriterFactory {
964+
ArrowColumnWriterFactory::new()
956965
}
957966
}
958967

959968
/// Returns [`ArrowColumnWriter`]s for each column in a given schema
969+
#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` instead")]
960970
pub fn get_column_writers(
961971
parquet: &SchemaDescriptor,
962972
props: &WriterPropertiesPtr,
@@ -976,30 +986,6 @@ pub fn get_column_writers(
976986
Ok(writers)
977987
}
978988

979-
/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar encryption
980-
#[cfg(feature = "encryption")]
981-
fn get_column_writers_with_encryptor(
982-
parquet: &SchemaDescriptor,
983-
props: &WriterPropertiesPtr,
984-
arrow: &SchemaRef,
985-
file_encryptor: Option<Arc<FileEncryptor>>,
986-
row_group_index: usize,
987-
) -> Result<Vec<ArrowColumnWriter>> {
988-
let mut writers = Vec::with_capacity(arrow.fields.len());
989-
let mut leaves = parquet.columns().iter();
990-
let column_factory =
991-
ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, file_encryptor);
992-
for field in &arrow.fields {
993-
column_factory.get_arrow_column_writer(
994-
field.data_type(),
995-
props,
996-
&mut leaves,
997-
&mut writers,
998-
)?;
999-
}
1000-
Ok(writers)
1001-
}
1002-
1003989
/// Creates [`ArrowColumnWriter`] instances
1004990
struct ArrowColumnWriterFactory {
1005991
#[cfg(feature = "encryption")]

parquet/src/file/writer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,12 @@ impl<W: Write + Send> SerializedFileWriter<W> {
392392
&self.descr
393393
}
394394

395+
/// Returns a reference to schema descriptor Arc.
396+
#[cfg(feature = "arrow")]
397+
pub(crate) fn schema_descr_ptr(&self) -> &SchemaDescPtr {
398+
&self.descr
399+
}
400+
395401
/// Returns a reference to the writer properties
396402
pub fn properties(&self) -> &WriterPropertiesPtr {
397403
&self.props

parquet/tests/encryption/encryption_async.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@ use crate::encryption_util::{
2424
use arrow_array::RecordBatch;
2525
use arrow_schema::Schema;
2626
use futures::TryStreamExt;
27-
use parquet::arrow::ParquetRecordBatchStreamBuilder;
2827
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
2928
use parquet::arrow::arrow_writer::{
3029
ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowRowGroupWriterFactory,
3130
ArrowWriterOptions, compute_leaves,
3231
};
33-
use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
32+
use parquet::arrow::{
33+
ArrowSchemaConverter, ArrowWriter, AsyncArrowWriter, ParquetRecordBatchStreamBuilder,
34+
};
3435
use parquet::encryption::decrypt::FileDecryptionProperties;
3536
use parquet::encryption::encrypt::FileEncryptionProperties;
3637
use parquet::errors::ParquetError;
@@ -696,18 +697,22 @@ async fn test_concurrent_encrypted_writing_over_multiple_row_groups() {
696697
}
697698
});
698699

699-
let props = Some(
700+
let props = Arc::new(
700701
WriterPropertiesBuilder::default()
701702
.with_file_encryption_properties(file_encryption_properties)
702703
.build(),
703704
);
705+
let parquet_schema = ArrowSchemaConverter::new()
706+
.with_coerce_types(props.coerce_types())
707+
.convert(schema)
708+
.unwrap();
704709

705710
// Create a temporary file to write the encrypted data
706711
let temp_file = tempfile::tempfile().unwrap();
707-
let arrow_writer =
708-
ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props.clone()).unwrap();
709712

710-
let (writer, row_group_writer_factory) = arrow_writer.into_serialized_writer().unwrap();
713+
let writer =
714+
SerializedFileWriter::new(&temp_file, parquet_schema.root_schema_ptr(), props).unwrap();
715+
let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(schema));
711716
let max_row_groups = 1;
712717

713718
let (serialize_tx, serialize_rx) =
@@ -757,19 +762,22 @@ async fn test_multi_threaded_encrypted_writing() {
757762
read_encrypted_file(&file, decryption_properties.clone()).unwrap();
758763
let schema = metadata.schema().clone();
759764

760-
let props = Some(
765+
let props = Arc::new(
761766
WriterPropertiesBuilder::default()
762767
.with_file_encryption_properties(file_encryption_properties)
763768
.build(),
764769
);
765770

771+
let parquet_schema = ArrowSchemaConverter::new()
772+
.with_coerce_types(props.coerce_types())
773+
.convert(&schema)
774+
.unwrap();
775+
766776
// Create a temporary file to write the encrypted data
767777
let temp_file = tempfile::tempfile().unwrap();
768-
let writer =
769-
ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props.clone()).unwrap();
770-
771-
let (mut serialized_file_writer, row_group_writer_factory) =
772-
writer.into_serialized_writer().unwrap();
778+
let mut writer =
779+
SerializedFileWriter::new(&temp_file, parquet_schema.root_schema_ptr(), props).unwrap();
780+
let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&writer, Arc::clone(&schema));
773781

774782
let (serialize_tx, mut serialize_rx) =
775783
tokio::sync::mpsc::channel::<JoinHandle<RBStreamSerializeResult>>(1);
@@ -805,7 +813,7 @@ async fn test_multi_threaded_encrypted_writing() {
805813
// Append the finalized row groups to the SerializedFileWriter
806814
while let Some(task) = serialize_rx.recv().await {
807815
let (arrow_column_chunks, _) = task.await.unwrap().unwrap();
808-
let mut row_group_writer = serialized_file_writer.next_row_group().unwrap();
816+
let mut row_group_writer = writer.next_row_group().unwrap();
809817
for chunk in arrow_column_chunks {
810818
chunk.append_to_row_group(&mut row_group_writer).unwrap();
811819
}
@@ -815,7 +823,7 @@ async fn test_multi_threaded_encrypted_writing() {
815823
// Wait for data generator and serialization task to finish
816824
data_generator.await.unwrap();
817825
launch_serialization_task.await.unwrap();
818-
let metadata = serialized_file_writer.close().unwrap();
826+
let metadata = writer.close().unwrap();
819827

820828
// Close the file writer which writes the footer
821829
assert_eq!(metadata.file_metadata().num_rows(), 50);

0 commit comments

Comments
 (0)