Skip to content

Commit 322745d

Browse files
rokadamreevealamb
authored
Enable parallel writing across row groups when writing encrypted parquet (#8162)
- Closes #8115. - Closes #8260 - Closes #8259 # Rationale for this change #8029 introduced `pub ArrowWriter.get_column_writers` and `pub ArrowWriter.append_row_group` to enable multi-threaded parquet encrypted writing. However testing downstream showed the API is not feasible, see #8115. # What changes are included in this PR? This introduces `pub ArrowWriter.into_serialized_writer` and deprecates `pub ArrowWriter.get_column_writers` and `pub ArrowWriter.append_row_group`. It also makes `ArrowRowGroupWriterFactory` public and adds a `pub ArrowRowGroupWriterFactory.create_column_writers`. # Are these changes tested? This includes a DataFusion inspired test for concurrent writing across columns and row groups to make sure parallel writing is and remains possible with `ArrowWriter`s API. Further we created a draft PR in DataFusion apache/datafusion#16738 to test for multithreaded writing support. # Are there any user-facing changes? See description of changes. --------- Co-authored-by: Adam Reeve <adreeve@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent f4840f6 commit 322745d

File tree

4 files changed

+391
-94
lines changed

4 files changed

+391
-94
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ impl<W: Write + Send> ArrowWriter<W> {
409409
}
410410

411411
/// Create a new row group writer and return its column writers.
412+
#[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")]
412413
pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
413414
self.flush()?;
414415
let in_progress = self
@@ -418,6 +419,7 @@ impl<W: Write + Send> ArrowWriter<W> {
418419
}
419420

420421
/// Append the given column chunks to the file as a new row group.
422+
#[deprecated(since = "56.2.0", note = "Use into_serialized_writer instead")]
421423
pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
422424
let mut row_group_writer = self.writer.next_row_group()?;
423425
for chunk in chunks {
@@ -426,6 +428,15 @@ impl<W: Write + Send> ArrowWriter<W> {
426428
row_group_writer.close()?;
427429
Ok(())
428430
}
431+
432+
/// Converts this writer into a lower-level [`SerializedFileWriter`] and [`ArrowRowGroupWriterFactory`].
433+
/// This can be useful to provide more control over how files are written.
434+
pub fn into_serialized_writer(
435+
mut self,
436+
) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
437+
self.flush()?;
438+
Ok((self.writer, self.row_group_writer_factory))
439+
}
429440
}
430441

431442
impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> {
@@ -851,7 +862,8 @@ impl ArrowRowGroupWriter {
851862
}
852863
}
853864

854-
struct ArrowRowGroupWriterFactory {
865+
/// Factory that creates new column writers for each row group in the Parquet file.
866+
pub struct ArrowRowGroupWriterFactory {
855867
schema: SchemaDescriptor,
856868
arrow_schema: SchemaRef,
857869
props: WriterPropertiesPtr,
@@ -906,6 +918,12 @@ impl ArrowRowGroupWriterFactory {
906918
let writers = get_column_writers(&self.schema, &self.props, &self.arrow_schema)?;
907919
Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
908920
}
921+
922+
/// Create column writers for a new row group.
923+
pub fn create_column_writers(&self, row_group_index: usize) -> Result<Vec<ArrowColumnWriter>> {
924+
let rg_writer = self.create_row_group_writer(row_group_index)?;
925+
Ok(rg_writer.writers)
926+
}
909927
}
910928

911929
/// Returns the [`ArrowColumnWriter`] for a given schema

parquet/src/arrow/async_writer/mod.rs

Lines changed: 2 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ mod store;
6161
pub use store::*;
6262

6363
use crate::{
64-
arrow::arrow_writer::{ArrowColumnChunk, ArrowColumnWriter, ArrowWriterOptions},
64+
arrow::arrow_writer::ArrowWriterOptions,
6565
arrow::ArrowWriter,
6666
errors::{ParquetError, Result},
6767
file::{metadata::RowGroupMetaData, properties::WriterProperties},
@@ -288,34 +288,16 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
288288

289289
Ok(())
290290
}
291-
292-
/// Create a new row group writer and return its column writers.
293-
pub async fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
294-
let before = self.sync_writer.flushed_row_groups().len();
295-
let writers = self.sync_writer.get_column_writers()?;
296-
if before != self.sync_writer.flushed_row_groups().len() {
297-
self.do_write().await?;
298-
}
299-
Ok(writers)
300-
}
301-
302-
/// Append the given column chunks to the file as a new row group.
303-
pub async fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> Result<()> {
304-
self.sync_writer.append_row_group(chunks)?;
305-
self.do_write().await
306-
}
307291
}
308292

309293
#[cfg(test)]
310294
mod tests {
295+
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
311296
use arrow::datatypes::{DataType, Field, Schema};
312297
use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader};
313298
use bytes::Bytes;
314299
use std::sync::Arc;
315300

316-
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
317-
use crate::arrow::arrow_writer::compute_leaves;
318-
319301
use super::*;
320302

321303
fn get_test_reader() -> ParquetRecordBatchReader {
@@ -349,51 +331,6 @@ mod tests {
349331
assert_eq!(to_write, read);
350332
}
351333

352-
#[tokio::test]
353-
async fn test_async_arrow_group_writer() {
354-
let col = Arc::new(Int64Array::from_iter_values([4, 5, 6])) as ArrayRef;
355-
let to_write_record = RecordBatch::try_from_iter([("col", col)]).unwrap();
356-
357-
let mut buffer = Vec::new();
358-
let mut writer =
359-
AsyncArrowWriter::try_new(&mut buffer, to_write_record.schema(), None).unwrap();
360-
361-
// Use classic API
362-
writer.write(&to_write_record).await.unwrap();
363-
364-
let mut writers = writer.get_column_writers().await.unwrap();
365-
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
366-
let to_write_arrow_group = RecordBatch::try_from_iter([("col", col)]).unwrap();
367-
368-
for (field, column) in to_write_arrow_group
369-
.schema()
370-
.fields()
371-
.iter()
372-
.zip(to_write_arrow_group.columns())
373-
{
374-
for leaf in compute_leaves(field.as_ref(), column).unwrap() {
375-
writers[0].write(&leaf).unwrap();
376-
}
377-
}
378-
379-
let columns: Vec<_> = writers.into_iter().map(|w| w.close().unwrap()).collect();
380-
// Append the arrow group as a new row group. Flush in progress
381-
writer.append_row_group(columns).await.unwrap();
382-
writer.close().await.unwrap();
383-
384-
let buffer = Bytes::from(buffer);
385-
let mut reader = ParquetRecordBatchReaderBuilder::try_new(buffer)
386-
.unwrap()
387-
.build()
388-
.unwrap();
389-
390-
let col = Arc::new(Int64Array::from_iter_values([4, 5, 6, 1, 2, 3])) as ArrayRef;
391-
let expected = RecordBatch::try_from_iter([("col", col)]).unwrap();
392-
393-
let read = reader.next().unwrap().unwrap();
394-
assert_eq!(expected, read);
395-
}
396-
397334
// Read the data from the test file and write it by the async writer and sync writer.
398335
// And then compares the results of the two writers.
399336
#[tokio::test]

0 commit comments

Comments
 (0)