diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 784959fbd9fb..0e4141c8e455 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -26,17 +26,16 @@ //! # #[tokio::main(flavor="current_thread")] //! # async fn main() { //! # -//! use std::sync::Arc; -//! use arrow_array::{ArrayRef, Int64Array, RecordBatch, RecordBatchReader}; -//! use bytes::Bytes; -//! use parquet::arrow::{AsyncArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder}; -//! +//! # use std::sync::Arc; +//! # use arrow_array::{ArrayRef, Int64Array, RecordBatch, RecordBatchReader}; +//! # use bytes::Bytes; +//! # use parquet::arrow::{AsyncArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder}; +//! # //! let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; //! let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); //! //! let mut buffer = Vec::new(); -//! let mut writer = -//! AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), 0, None).unwrap(); +//! let mut writer = AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap(); //! writer.write(&to_write).await.unwrap(); //! writer.close().await.unwrap(); //! @@ -62,19 +61,15 @@ use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use tokio::io::{AsyncWrite, AsyncWriteExt}; -/// Async arrow writer. +/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncWrite`] /// -/// It is implemented based on the sync writer [`ArrowWriter`] with an inner buffer. -/// The buffered data will be flushed to the writer provided by caller when the -/// buffer's threshold is exceeded. +/// ## Memory Usage /// -/// ## Memory Limiting -/// -/// The nature of parquet forces buffering of an entire row group before it can be flushed -/// to the underlying writer. This buffering may exceed the configured buffer size -/// of [`AsyncArrowWriter`]. Memory usage can be limited by prematurely flushing the row group, -/// although this will have implications for file size and query performance. See [ArrowWriter] -/// for more information. +/// The columnar nature of parquet forces buffering data for an entire row group, as such +/// [`AsyncArrowWriter`] uses [`ArrowWriter`] to encode each row group in memory, before +/// flushing it to the provided [`AsyncWrite`]. Memory usage can be limited by prematurely +/// flushing the row group, although this will have implications for file size and query +/// performance. See [ArrowWriter] for more information. /// /// ```no_run /// # use tokio::fs::File; @@ -96,50 +91,30 @@ pub struct AsyncArrowWriter { /// Async writer provided by caller async_writer: W, - - /// Trigger forced flushing once buffer size reaches this value - buffer_size: usize, } impl AsyncArrowWriter { - /// Try to create a new Async Arrow Writer. - /// - /// `buffer_size` determines the minimum number of bytes to buffer before flushing - /// to the underlying [`AsyncWrite`]. However, the nature of writing parquet may - /// force buffering of data in excess of this within the underlying [`ArrowWriter`]. - /// See the documentation on [`ArrowWriter`] for more details + /// Try to create a new Async Arrow Writer pub fn try_new( writer: W, arrow_schema: SchemaRef, - buffer_size: usize, props: Option, ) -> Result { let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default()); - Self::try_new_with_options(writer, arrow_schema, buffer_size, options) + Self::try_new_with_options(writer, arrow_schema, options) } - /// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`]. - /// - /// `buffer_size` determines the minimum number of bytes to buffer before flushing - /// to the underlying [`AsyncWrite`]. However, the nature of writing parquet may - /// force buffering of data in excess of this within the underlying [`ArrowWriter`]. - /// See the documentation on [`ArrowWriter`] for more details + /// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`] pub fn try_new_with_options( writer: W, arrow_schema: SchemaRef, - buffer_size: usize, options: ArrowWriterOptions, ) -> Result { - let sync_writer = ArrowWriter::try_new_with_options( - Vec::with_capacity(buffer_size), - arrow_schema, - options, - )?; + let sync_writer = ArrowWriter::try_new_with_options(Vec::new(), arrow_schema, options)?; Ok(Self { sync_writer, async_writer: writer, - buffer_size, }) } @@ -168,14 +143,18 @@ impl AsyncArrowWriter { /// After every sync write by the inner [ArrowWriter], the inner buffer will be /// checked and flush if at least half full pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> { + let before = self.sync_writer.flushed_row_groups().len(); self.sync_writer.write(batch)?; - self.try_flush(false).await + if before != self.sync_writer.flushed_row_groups().len() { + self.do_write().await?; + } + Ok(()) } /// Flushes all buffered rows into a new row group pub async fn flush(&mut self) -> Result<()> { self.sync_writer.flush()?; - self.try_flush(false).await?; + self.do_write().await?; Ok(()) } @@ -194,19 +173,15 @@ impl AsyncArrowWriter { let metadata = self.sync_writer.finish()?; // Force to flush the remaining data. - self.try_flush(true).await?; + self.do_write().await?; self.async_writer.shutdown().await?; Ok(metadata) } - /// Flush the buffered data into the `async_writer` - async fn try_flush(&mut self, force: bool) -> Result<()> { + /// Flush the data written by `sync_writer` into the `async_writer` + async fn do_write(&mut self) -> Result<()> { let buffer = self.sync_writer.inner_mut(); - if !force && (buffer.is_empty() || buffer.len() < self.buffer_size) { - // no need to flush - return Ok(()); - } self.async_writer .write_all(buffer.as_slice()) @@ -254,8 +229,7 @@ mod tests { let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap(); let mut buffer = Vec::new(); - let mut writer = - AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), 0, None).unwrap(); + let mut writer = AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap(); writer.write(&to_write).await.unwrap(); writer.close().await.unwrap(); @@ -283,7 +257,6 @@ mod tests { let mut async_writer = AsyncArrowWriter::try_new( &mut async_buffer, reader.schema(), - 1024, Some(write_props.clone()), ) .unwrap(); @@ -345,54 +318,6 @@ mod tests { } } - #[tokio::test] - async fn test_async_writer_with_buffer_flush_threshold() { - let write_props = WriterProperties::builder() - .set_max_row_group_size(2048) - .build(); - let expect_encode_size = { - let reader = get_test_reader(); - let mut buffer = Vec::new(); - let mut async_writer = AsyncArrowWriter::try_new( - &mut buffer, - reader.schema(), - 0, - Some(write_props.clone()), - ) - .unwrap(); - for record_batch in reader { - let record_batch = record_batch.unwrap(); - async_writer.write(&record_batch).await.unwrap(); - } - async_writer.close().await.unwrap(); - buffer.len() - }; - - let test_buffer_flush_thresholds = vec![0, 1024, 40 * 1024, 50 * 1024, 100 * 1024]; - - for buffer_flush_threshold in test_buffer_flush_thresholds { - let reader = get_test_reader(); - let mut test_async_sink = TestAsyncSink { - sink: Vec::new(), - min_accept_bytes: buffer_flush_threshold, - expect_total_bytes: expect_encode_size, - }; - let mut async_writer = AsyncArrowWriter::try_new( - &mut test_async_sink, - reader.schema(), - buffer_flush_threshold * 2, - Some(write_props.clone()), - ) - .unwrap(); - - for record_batch in reader { - let record_batch = record_batch.unwrap(); - async_writer.write(&record_batch).await.unwrap(); - } - async_writer.close().await.unwrap(); - } - } - #[tokio::test] async fn test_async_writer_file() { let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; @@ -406,7 +331,7 @@ mod tests { let temp = tempfile::tempfile().unwrap(); let file = tokio::fs::File::from_std(temp.try_clone().unwrap()); - let mut writer = AsyncArrowWriter::try_new(file, to_write.schema(), 0, None).unwrap(); + let mut writer = AsyncArrowWriter::try_new(file, to_write.schema(), None).unwrap(); writer.write(&to_write).await.unwrap(); writer.close().await.unwrap(); @@ -430,36 +355,33 @@ mod tests { // build a record batch let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); - for buffer_size in [0, 8, 1024] { - let temp = tempfile::tempfile().unwrap(); - let file = tokio::fs::File::from_std(temp.try_clone().unwrap()); - let mut writer = - AsyncArrowWriter::try_new(file, batch.schema(), buffer_size, None).unwrap(); - - // starts empty - assert_eq!(writer.in_progress_size(), 0); - assert_eq!(writer.in_progress_rows(), 0); - assert_eq!(writer.bytes_written(), 4); // Initial Parquet header - writer.write(&batch).await.unwrap(); - - // updated on write - let initial_size = writer.in_progress_size(); - assert!(initial_size > 0); - assert_eq!(writer.in_progress_rows(), batch.num_rows()); - - // updated on second write - writer.write(&batch).await.unwrap(); - assert!(writer.in_progress_size() > initial_size); - assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2); - - // in progress tracking is cleared, but the overall data written is updated - let pre_flush_bytes_written = writer.bytes_written(); - writer.flush().await.unwrap(); - assert_eq!(writer.in_progress_size(), 0); - assert_eq!(writer.in_progress_rows(), 0); - assert!(writer.bytes_written() > pre_flush_bytes_written); - - writer.close().await.unwrap(); - } + let temp = tempfile::tempfile().unwrap(); + let file = tokio::fs::File::from_std(temp.try_clone().unwrap()); + let mut writer = AsyncArrowWriter::try_new(file, batch.schema(), None).unwrap(); + + // starts empty + assert_eq!(writer.in_progress_size(), 0); + assert_eq!(writer.in_progress_rows(), 0); + assert_eq!(writer.bytes_written(), 4); // Initial Parquet header + writer.write(&batch).await.unwrap(); + + // updated on write + let initial_size = writer.in_progress_size(); + assert!(initial_size > 0); + assert_eq!(writer.in_progress_rows(), batch.num_rows()); + + // updated on second write + writer.write(&batch).await.unwrap(); + assert!(writer.in_progress_size() > initial_size); + assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2); + + // in progress tracking is cleared, but the overall data written is updated + let pre_flush_bytes_written = writer.bytes_written(); + writer.flush().await.unwrap(); + assert_eq!(writer.in_progress_size(), 0); + assert_eq!(writer.in_progress_rows(), 0); + assert!(writer.bytes_written() > pre_flush_bytes_written); + + writer.close().await.unwrap(); } }