Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: more efficient parquet writer and more statistics #1397

Merged
merged 4 commits into from
May 28, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
efficient writer and stats
  • Loading branch information
wjones127 committed May 28, 2023
commit 1dbab88c8be79aefab450f02c2c3b42c930b6843
41 changes: 11 additions & 30 deletions rust/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::collections::HashMap;
use crate::action::Add;
use crate::storage::ObjectStoreRef;
use crate::writer::record_batch::{divide_by_partition_values, PartitionResult};
use crate::writer::stats::{apply_null_counts, create_add, NullCounts};
use crate::writer::stats::create_add;
use crate::writer::utils::{
arrow_schema_without_partitions, record_batch_without_partitions, PartitionPath,
ShareableBuffer,
Expand All @@ -16,7 +16,6 @@ use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use log::warn;
use object_store::{path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
Expand Down Expand Up @@ -269,7 +268,6 @@ pub(crate) struct PartitionWriter {
buffer: ShareableBuffer,
arrow_writer: ArrowWriter<ShareableBuffer>,
part_counter: usize,
null_counts: NullCounts,
files_written: Vec<Add>,
}

Expand All @@ -293,7 +291,6 @@ impl PartitionWriter {
buffer,
arrow_writer,
part_counter: 0,
null_counts: NullCounts::new(),
files_written: Vec::new(),
})
}
Expand All @@ -307,11 +304,8 @@ impl PartitionWriter {
self.config.prefix.child(file_name)
}

fn replace_arrow_buffer(
&mut self,
seed: impl AsRef<[u8]>,
) -> DeltaResult<(ArrowWriter<ShareableBuffer>, ShareableBuffer)> {
let new_buffer = ShareableBuffer::from_bytes(seed.as_ref());
fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter<ShareableBuffer>, ShareableBuffer)> {
let new_buffer = ShareableBuffer::default();
let arrow_writer = ArrowWriter::try_new(
new_buffer.clone(),
self.config.file_schema.clone(),
Expand All @@ -324,40 +318,27 @@ impl PartitionWriter {
}

fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
// copy current cursor bytes so we can recover from failures
// TODO is copying this something we should be doing?
let buffer_bytes = self.buffer.to_vec();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the offending line causing a lot of data copying.

match self.arrow_writer.write(batch) {
Ok(_) => {
apply_null_counts(&batch.clone().into(), &mut self.null_counts, 0);
Ok(())
}
Err(err) => {
// if a write fails we need to reset the state of the PartitionWriter
warn!("error writing to arrow buffer, resetting writer state.");
self.replace_arrow_buffer(buffer_bytes)?;
Err(err.into())
}
}
Ok(self.arrow_writer.write(batch)?)
}

async fn flush_arrow_writer(&mut self) -> DeltaResult<()> {
// replace counter / buffers and close the current writer
let (writer, buffer) = self.replace_arrow_buffer(vec![])?;
let null_counts = std::mem::take(&mut self.null_counts);
let (writer, buffer) = self.reset_writer()?;
let metadata = writer.close()?;
let buffer = match buffer.into_inner() {
Some(buffer) => Bytes::from(buffer),
None => return Ok(()), // Nothing to write
};

// collect metadata
let path = self.next_data_path();
let obj_bytes = Bytes::from(buffer.to_vec());
let file_size = obj_bytes.len() as i64;
let file_size = buffer.len() as i64;

// write file to object store
self.object_store.put(&path, obj_bytes).await?;
self.object_store.put(&path, buffer).await?;
self.files_written.push(
create_add(
&self.config.partition_values,
null_counts,
path.to_string(),
file_size,
&metadata,
Expand Down
6 changes: 1 addition & 5 deletions rust/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,19 +363,15 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {
let writers = std::mem::take(&mut self.arrow_writers);
let mut actions = Vec::new();

for (_, mut writer) in writers {
for (_, writer) in writers {
let metadata = writer.arrow_writer.close()?;
let path = next_data_path(&self.partition_columns, &writer.partition_values, None)?;
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let file_size = obj_bytes.len() as i64;
self.storage.put(&path, obj_bytes).await?;

// Replace self null_counts with an empty map. Use the other for stats.
let null_counts = std::mem::take(&mut writer.null_counts);

actions.push(create_add(
&writer.partition_values,
null_counts,
path.to_string(),
file_size,
&metadata,
Expand Down
14 changes: 10 additions & 4 deletions rust/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{DeltaTable, DeltaTableError};
use arrow::{datatypes::SchemaRef, datatypes::*, error::ArrowError};
use async_trait::async_trait;
use object_store::Error as ObjectStoreError;
use parquet::{basic::LogicalType, errors::ParquetError};
use parquet::errors::ParquetError;
use serde_json::Value;

pub use json::JsonWriter;
Expand Down Expand Up @@ -55,9 +55,15 @@ pub(crate) enum DeltaWriterError {
},

/// Serialization of delta log statistics failed.
#[error("Serialization of delta log statistics failed: {source}")]
StatsSerializationFailed {
/// error raised during stats serialization.
#[error("Failed to write statistics value {debug_value} with logical type {logical_type:?}")]
StatsParsingFailed {
debug_value: String,
logical_type: Option<parquet::basic::LogicalType>,
},

/// JSON serialization failed
#[error("Failed to serialize data to JSON: {source}")]
JSONSerializationFailed {
#[from]
source: serde_json::Error,
},
Expand Down
6 changes: 1 addition & 5 deletions rust/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,19 +225,15 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
let writers = std::mem::take(&mut self.arrow_writers);
let mut actions = Vec::new();

for (_, mut writer) in writers {
for (_, writer) in writers {
let metadata = writer.arrow_writer.close()?;
let path = next_data_path(&self.partition_columns, &writer.partition_values, None)?;
let obj_bytes = Bytes::from(writer.buffer.to_vec());
let file_size = obj_bytes.len() as i64;
self.storage.put(&path, obj_bytes).await?;

// Replace self null_counts with an empty map. Use the other for stats.
let null_counts = std::mem::take(&mut writer.null_counts);

actions.push(create_add(
&writer.partition_values,
null_counts,
path.to_string(),
file_size,
&metadata,
Expand Down
Loading