Skip to content

Commit

Permalink
Parallelize Stateless (CSV/JSON) File Write Serialization (#7452)
Browse files Browse the repository at this point in the history
* implement tokio task spawning

* improve error handling

* parallelize single_file true

* cargo fmt

* Update datafusion/core/src/datasource/file_format/write.rs

Co-authored-by: Metehan Yıldırım <100111937+metesynnada@users.noreply.github.com>

* add channel

* cargo fmt

* yield_now if input is fifo unbounded

* fix unbounded option parse

* cargo fmt

* Remove redundant source

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* address comments, add type aliases

* cargo fmt

---------

Co-authored-by: Metehan Yıldırım <100111937+metesynnada@users.noreply.github.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
3 people authored Sep 9, 2023
1 parent 495c25f commit abea893
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 63 deletions.
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,14 @@ impl BatchSerializer for CsvSerializer {
self.header = false;
Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
}

fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
let new_self = CsvSerializer::new()
.with_builder(self.builder.clone())
.with_header(self.header);
self.header = false;
Ok(Box::new(new_self))
}
}

/// Implements [`DataSink`] for writing to a CSV file.
Expand Down Expand Up @@ -579,6 +587,7 @@ impl DataSink for CsvSink {
serializers,
writers,
self.config.single_file_output,
self.config.unbounded_input,
)
.await
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ impl BatchSerializer for JsonSerializer {
//drop(writer);
Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
}

fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
Ok(Box::new(JsonSerializer::new()))
}
}

/// Implements [`DataSink`] for writing to a Json file.
Expand Down Expand Up @@ -364,6 +368,7 @@ impl DataSink for JsonSink {
serializers,
writers,
self.config.single_file_output,
self.config.unbounded_input,
)
.await
}
Expand Down
278 changes: 219 additions & 59 deletions datafusion/core/src/datasource/file_format/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@ use datafusion_common::{exec_err, internal_err, DataFusionError, FileCompression

use async_trait::async_trait;
use bytes::Bytes;
use datafusion_execution::RecordBatchStream;
use futures::future::BoxFuture;
use futures::FutureExt;
use futures::{ready, StreamExt};
use object_store::path::Path;
use object_store::{MultipartId, ObjectMeta, ObjectStore};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc;
use tokio::task::{JoinHandle, JoinSet};

/// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores.
/// It is specifically designed for the `object_store` crate's `put` method and sends
Expand Down Expand Up @@ -237,29 +240,11 @@ pub enum FileWriterMode {
pub trait BatchSerializer: Unpin + Send {
/// Asynchronously serializes a `RecordBatch` and returns the serialized bytes.
async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
}

/// Checks if any of the passed writers have encountered an error
/// and if so, all writers are aborted.
async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
result: Result<T>,
writers: &mut [AbortableWrite<W>],
) -> Result<T> {
match result {
Ok(value) => Ok(value),
Err(e) => {
// Abort all writers before returning the error:
for writer in writers {
let mut abort_future = writer.abort_writer();
if let Ok(abort_future) = &mut abort_future {
let _ = abort_future.await;
}
// Ignore errors that occur during abortion,
// We do try to abort all writers before returning error.
}
// After aborting writers return original error.
Err(e)
}
/// Duplicates self to support serializing multiple batches in parallel on multiple cores
fn duplicate(&mut self) -> Result<Box<dyn BatchSerializer>> {
Err(DataFusionError::NotImplemented(
"Parallel serialization is not implemented for this file type".into(),
))
}
}

Expand Down Expand Up @@ -315,58 +300,233 @@ pub(crate) async fn create_writer(
}
}

type WriterType = AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>;
type SerializerType = Box<dyn BatchSerializer>;

/// Serializes a single data stream in parallel and writes to an ObjectStore
/// concurrently. Data order is preserved. In the event of an error,
/// the ObjectStore writer is returned to the caller in addition to an error,
/// so that the caller may handle aborting failed writes.
async fn serialize_rb_stream_to_object_store(
mut data_stream: Pin<Box<dyn RecordBatchStream + Send>>,
mut serializer: Box<dyn BatchSerializer>,
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
unbounded_input: bool,
) -> std::result::Result<(SerializerType, WriterType, u64), (WriterType, DataFusionError)>
{
let (tx, mut rx) =
mpsc::channel::<JoinHandle<Result<(usize, Bytes), DataFusionError>>>(100);

let serialize_task = tokio::spawn(async move {
while let Some(maybe_batch) = data_stream.next().await {
match serializer.duplicate() {
Ok(mut serializer_clone) => {
let handle = tokio::spawn(async move {
let batch = maybe_batch?;
let num_rows = batch.num_rows();
let bytes = serializer_clone.serialize(batch).await?;
Ok((num_rows, bytes))
});
tx.send(handle).await.map_err(|_| {
DataFusionError::Internal(
"Unknown error writing to object store".into(),
)
})?;
if unbounded_input {
tokio::task::yield_now().await;
}
}
Err(_) => {
return Err(DataFusionError::Internal(
"Unknown error writing to object store".into(),
))
}
}
}
Ok(serializer)
});

let mut row_count = 0;
while let Some(handle) = rx.recv().await {
match handle.await {
Ok(Ok((cnt, bytes))) => {
match writer.write_all(&bytes).await {
Ok(_) => (),
Err(e) => {
return Err((
writer,
DataFusionError::Execution(format!(
"Error writing to object store: {e}"
)),
))
}
};
row_count += cnt;
}
Ok(Err(e)) => {
// Return the writer along with the error
return Err((writer, e));
}
Err(e) => {
// Handle task panic or cancellation
return Err((
writer,
DataFusionError::Execution(format!(
"Serialization task panicked or was cancelled: {e}"
)),
));
}
}
}

let serializer = match serialize_task.await {
Ok(Ok(serializer)) => serializer,
Ok(Err(e)) => return Err((writer, e)),
Err(_) => {
return Err((
writer,
DataFusionError::Internal("Unknown error writing to object store".into()),
))
}
};
Ok((serializer, writer, row_count as u64))
}

/// Contains the common logic for serializing RecordBatches and
/// writing the resulting bytes to an ObjectStore.
/// Serialization is assumed to be stateless, i.e.
/// each RecordBatch can be serialized without any
/// dependency on the RecordBatches before or after.
pub(crate) async fn stateless_serialize_and_write_files(
mut data: Vec<SendableRecordBatchStream>,
mut serializers: Vec<Box<dyn BatchSerializer>>,
mut writers: Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>,
data: Vec<SendableRecordBatchStream>,
mut serializers: Vec<SerializerType>,
mut writers: Vec<WriterType>,
single_file_output: bool,
unbounded_input: bool,
) -> Result<u64> {
if single_file_output && (serializers.len() != 1 || writers.len() != 1) {
return internal_err!("single_file_output is true, but got more than 1 writer!");
}
let num_partitions = data.len();
if !single_file_output && (num_partitions != writers.len()) {
let num_writers = writers.len();
if !single_file_output && (num_partitions != num_writers) {
return internal_err!("single_file_ouput is false, but did not get 1 writer for each output partition!");
}
let mut row_count = 0;
// Map errors to DatafusionError.
let err_converter =
|_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
// TODO parallelize serialization accross partitions and batches within partitions
// see: https://github.com/apache/arrow-datafusion/issues/7079
for (part_idx, data_stream) in data.iter_mut().enumerate().take(num_partitions) {
let idx = match single_file_output {
false => part_idx,
true => 0,
};
while let Some(maybe_batch) = data_stream.next().await {
// Write data to files in a round robin fashion:
let serializer = &mut serializers[idx];
let batch = check_for_errors(maybe_batch, &mut writers).await?;
row_count += batch.num_rows();
let bytes =
check_for_errors(serializer.serialize(batch).await, &mut writers).await?;
let writer = &mut writers[idx];
check_for_errors(
writer.write_all(&bytes).await.map_err(err_converter),
&mut writers,
)
.await?;
// tracks if any writers encountered an error triggering the need to abort
let mut any_errors = false;
// tracks the specific error triggering abort
let mut triggering_error = None;
// tracks if any errors were encountered in the process of aborting writers.
// if true, we may not have a guarentee that all written data was cleaned up.
let mut any_abort_errors = false;
match single_file_output {
false => {
let mut join_set = JoinSet::new();
for (data_stream, serializer, writer) in data
.into_iter()
.zip(serializers.into_iter())
.zip(writers.into_iter())
.map(|((a, b), c)| (a, b, c))
{
join_set.spawn(async move {
serialize_rb_stream_to_object_store(
data_stream,
serializer,
writer,
unbounded_input,
)
.await
});
}
let mut finished_writers = Vec::with_capacity(num_writers);
while let Some(result) = join_set.join_next().await {
match result {
Ok(res) => match res {
Ok((_, writer, cnt)) => {
finished_writers.push(writer);
row_count += cnt;
}
Err((writer, e)) => {
finished_writers.push(writer);
any_errors = true;
triggering_error = Some(e);
}
},
Err(e) => {
// Don't panic, instead try to clean up as many writers as possible.
// If we hit this code, ownership of a writer was not joined back to
// this thread, so we cannot clean it up (hence any_abort_errors is true)
any_errors = true;
any_abort_errors = true;
triggering_error = Some(DataFusionError::Internal(format!(
"Unexpected join error while serializing file {e}"
)));
}
}
}

// Finalize or abort writers as appropriate
for mut writer in finished_writers.into_iter() {
match any_errors {
true => {
let abort_result = writer.abort_writer();
if abort_result.is_err() {
any_abort_errors = true;
}
}
false => {
writer.shutdown()
.await
.map_err(|_| DataFusionError::Internal("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!".into()))?;
}
}
}
}
true => {
let mut writer = writers.remove(0);
let mut serializer = serializers.remove(0);
let mut cnt;
for data_stream in data.into_iter() {
(serializer, writer, cnt) = match serialize_rb_stream_to_object_store(
data_stream,
serializer,
writer,
unbounded_input,
)
.await
{
Ok((s, w, c)) => (s, w, c),
Err((w, e)) => {
any_errors = true;
triggering_error = Some(e);
writer = w;
break;
}
};
row_count += cnt;
}
match any_errors {
true => {
let abort_result = writer.abort_writer();
if abort_result.is_err() {
any_abort_errors = true;
}
}
false => writer.shutdown().await?,
}
}
}
// Perform cleanup:
let n_writers = writers.len();
for idx in 0..n_writers {
check_for_errors(
writers[idx].shutdown().await.map_err(err_converter),
&mut writers,
)
.await?;

if any_errors {
match any_abort_errors{
true => return Err(DataFusionError::Internal("Error encountered during writing to ObjectStore and failed to abort all writers. Partial result may have been written.".into())),
false => match triggering_error {
Some(e) => return Err(e),
None => return Err(DataFusionError::Internal("Unknown Error encountered during writing to ObjectStore. All writers succesfully aborted.".into()))
}
}
}
Ok(row_count as u64)

Ok(row_count)
}
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ impl TableProvider for ListingTable {
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
writer_mode,
unbounded_input: self.options().infinite_source,
single_file_output: self.options.single_file,
overwrite,
file_type_writer_options,
Expand Down
Loading

0 comments on commit abea893

Please sign in to comment.