Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring some methods over from ArrowWriter to the async version #5251

Merged
merged 4 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ impl<W: Write + Send> ArrowWriter<W> {
.unwrap_or_default()
}

/// Returns the number of bytes written by this instance
pub fn bytes_written(&self) -> usize {
self.writer.bytes_written()
}

/// Encodes the provided [`RecordBatch`]
///
/// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
Expand Down Expand Up @@ -2694,6 +2699,7 @@ mod tests {
// starts empty
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert_eq!(writer.bytes_written(), 4); // Initial header
writer.write(&batch).unwrap();

// updated on write
Expand All @@ -2706,10 +2712,12 @@ mod tests {
assert!(writer.in_progress_size() > initial_size);
assert_eq!(writer.in_progress_rows(), 10);

// cleared on flush
// in progress tracking is cleared, but the overall data written is updated
let pre_flush_bytes_written = writer.bytes_written();
writer.flush().unwrap();
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert!(writer.bytes_written() > pre_flush_bytes_written);

writer.close().unwrap();
}
Expand Down
77 changes: 75 additions & 2 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use std::{io::Write, sync::Arc};
use crate::{
arrow::ArrowWriter,
errors::{ParquetError, Result},
file::properties::WriterProperties,
file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties},
format::{FileMetaData, KeyValue},
};
use arrow_array::RecordBatch;
Expand Down Expand Up @@ -109,6 +109,26 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
})
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
self.sync_writer.flushed_row_groups()
}

/// Returns the estimated length in bytes of the current in progress row group
pub fn in_progress_size(&self) -> usize {
self.sync_writer.in_progress_size()
}

/// Returns the number of rows buffered in the in progress row group
pub fn in_progress_rows(&self) -> usize {
self.sync_writer.in_progress_rows()
}

/// Returns the number of bytes written by this instance
pub fn bytes_written(&self) -> usize {
self.sync_writer.bytes_written()
}

/// Enqueues the provided `RecordBatch` to be written
///
/// After every sync write by the inner [ArrowWriter], the inner buffer will be
Expand All @@ -123,6 +143,14 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
.await
}

/// Flushes all buffered rows into a new row group
pub async fn flush(&mut self) -> Result<()> {
self.sync_writer.flush()?;
Self::try_flush(&mut self.shared_buffer, &mut self.async_writer, 0).await?;

Ok(())
}

/// Append [`KeyValue`] metadata in addition to those in [`WriterProperties`]
///
/// This method allows to append metadata after [`RecordBatch`]es are written.
Expand Down Expand Up @@ -206,7 +234,8 @@ impl Write for SharedBuffer {

#[cfg(test)]
mod tests {
use arrow_array::{ArrayRef, BinaryArray, Int64Array, RecordBatchReader};
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatchReader};
use bytes::Bytes;
use tokio::pin;

Expand Down Expand Up @@ -395,4 +424,48 @@ mod tests {

assert_eq!(to_write, read);
}

#[tokio::test]
async fn in_progress_accounting() {
// define schema
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);

// create some data
let a = Int32Array::from_value(0_i32, 512);

// build a record batch
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();

for buffer_size in [0, 8, 1024] {
let temp = tempfile::tempfile().unwrap();
let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
let mut writer =
AsyncArrowWriter::try_new(file, batch.schema(), buffer_size, None).unwrap();

// starts empty
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert_eq!(writer.bytes_written(), 4); // Initial Parquet header
writer.write(&batch).await.unwrap();

// updated on write
let initial_size = writer.in_progress_size();
assert!(initial_size > 0);
assert_eq!(writer.in_progress_rows(), batch.num_rows());

// updated on second write
writer.write(&batch).await.unwrap();
assert!(writer.in_progress_size() > initial_size);
assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2);

// in progress tracking is cleared, but the overall data written is updated
let pre_flush_bytes_written = writer.bytes_written();
writer.flush().await.unwrap();
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert!(writer.bytes_written() > pre_flush_bytes_written);

writer.close().await.unwrap();
}
}
}
5 changes: 5 additions & 0 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,11 @@ impl<W: Write + Send> SerializedFileWriter<W> {

self.buf.into_inner()
}

/// Returns the number of bytes written to this instance
pub fn bytes_written(&self) -> usize {
self.buf.bytes_written()
}
}

/// Parquet row group writer API.
Expand Down
Loading