Skip to content

Commit

Permalink
Bring some methods over from ArrowWriter to the async version (#5251)
Browse files Browse the repository at this point in the history
* Bring some methods over from ArrowWriter to the async version

* Add bytes_written

* Fix typo

* Add bytes_written test
  • Loading branch information
AdamGS authored Feb 24, 2024
1 parent 1e0a264 commit 4325900
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 3 deletions.
10 changes: 9 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ impl<W: Write + Send> ArrowWriter<W> {
.unwrap_or_default()
}

/// Returns the number of bytes written by this instance
pub fn bytes_written(&self) -> usize {
self.writer.bytes_written()
}

/// Encodes the provided [`RecordBatch`]
///
/// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
Expand Down Expand Up @@ -2748,6 +2753,7 @@ mod tests {
// starts empty
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert_eq!(writer.bytes_written(), 4); // Initial header
writer.write(&batch).unwrap();

// updated on write
Expand All @@ -2760,10 +2766,12 @@ mod tests {
assert!(writer.in_progress_size() > initial_size);
assert_eq!(writer.in_progress_rows(), 10);

// cleared on flush
// in progress tracking is cleared, but the overall data written is updated
let pre_flush_bytes_written = writer.bytes_written();
writer.flush().unwrap();
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert!(writer.bytes_written() > pre_flush_bytes_written);

writer.close().unwrap();
}
Expand Down
77 changes: 75 additions & 2 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::{
arrow::arrow_writer::ArrowWriterOptions,
arrow::ArrowWriter,
errors::{ParquetError, Result},
file::properties::WriterProperties,
file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties},
format::{FileMetaData, KeyValue},
};
use arrow_array::RecordBatch;
Expand Down Expand Up @@ -130,6 +130,26 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
})
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
self.sync_writer.flushed_row_groups()
}

/// Returns the estimated length in bytes of the current in progress row group
pub fn in_progress_size(&self) -> usize {
self.sync_writer.in_progress_size()
}

/// Returns the number of rows buffered in the in progress row group
pub fn in_progress_rows(&self) -> usize {
self.sync_writer.in_progress_rows()
}

/// Returns the number of bytes written by this instance
pub fn bytes_written(&self) -> usize {
self.sync_writer.bytes_written()
}

/// Enqueues the provided `RecordBatch` to be written
///
/// After every sync write by the inner [ArrowWriter], the inner buffer will be
Expand All @@ -144,6 +164,14 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
.await
}

/// Flushes all buffered rows into a new row group
pub async fn flush(&mut self) -> Result<()> {
self.sync_writer.flush()?;
Self::try_flush(&mut self.shared_buffer, &mut self.async_writer, 0).await?;

Ok(())
}

/// Append [`KeyValue`] metadata in addition to those in [`WriterProperties`]
///
/// This method allows to append metadata after [`RecordBatch`]es are written.
Expand Down Expand Up @@ -227,7 +255,8 @@ impl Write for SharedBuffer {

#[cfg(test)]
mod tests {
use arrow_array::{ArrayRef, BinaryArray, Int64Array, RecordBatchReader};
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader};
use bytes::Bytes;
use tokio::pin;

Expand Down Expand Up @@ -416,4 +445,48 @@ mod tests {

assert_eq!(to_write, read);
}

#[tokio::test]
async fn in_progress_accounting() {
// define schema
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);

// create some data
let a = Int32Array::from_value(0_i32, 512);

// build a record batch
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();

for buffer_size in [0, 8, 1024] {
let temp = tempfile::tempfile().unwrap();
let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
let mut writer =
AsyncArrowWriter::try_new(file, batch.schema(), buffer_size, None).unwrap();

// starts empty
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert_eq!(writer.bytes_written(), 4); // Initial Parquet header
writer.write(&batch).await.unwrap();

// updated on write
let initial_size = writer.in_progress_size();
assert!(initial_size > 0);
assert_eq!(writer.in_progress_rows(), batch.num_rows());

// updated on second write
writer.write(&batch).await.unwrap();
assert!(writer.in_progress_size() > initial_size);
assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2);

// in progress tracking is cleared, but the overall data written is updated
let pre_flush_bytes_written = writer.bytes_written();
writer.flush().await.unwrap();
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert!(writer.bytes_written() > pre_flush_bytes_written);

writer.close().await.unwrap();
}
}
}
5 changes: 5 additions & 0 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,11 @@ impl<W: Write + Send> SerializedFileWriter<W> {

self.buf.into_inner()
}

/// Returns the number of bytes written to this instance
pub fn bytes_written(&self) -> usize {
self.buf.bytes_written()
}
}

/// Parquet row group writer API.
Expand Down

0 comments on commit 4325900

Please sign in to comment.