diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 4d2caccacd0d..4578ab5a4324 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -432,6 +432,14 @@ impl BatchSerializer for CsvSerializer { self.header = false; Ok(Bytes::from(self.buffer.drain(..).collect::>())) } + + fn duplicate(&mut self) -> Result> { + let new_self = CsvSerializer::new() + .with_builder(self.builder.clone()) + .with_header(self.header); + self.header = false; + Ok(Box::new(new_self)) + } } /// Implements [`DataSink`] for writing to a CSV file. @@ -579,6 +587,7 @@ impl DataSink for CsvSink { serializers, writers, self.config.single_file_output, + self.config.unbounded_input, ) .await } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 27b17d86f9b9..0aa87a9a3228 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -221,6 +221,10 @@ impl BatchSerializer for JsonSerializer { //drop(writer); Ok(Bytes::from(self.buffer.drain(..).collect::>())) } + + fn duplicate(&mut self) -> Result> { + Ok(Box::new(JsonSerializer::new())) + } } /// Implements [`DataSink`] for writing to a Json file. @@ -364,6 +368,7 @@ impl DataSink for JsonSink { serializers, writers, self.config.single_file_output, + self.config.unbounded_input, ) .await } diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index 272eee1fbcc3..222fe5b519aa 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -33,12 +33,15 @@ use datafusion_common::{exec_err, internal_err, DataFusionError, FileCompression use async_trait::async_trait; use bytes::Bytes; +use datafusion_execution::RecordBatchStream; use futures::future::BoxFuture; use futures::FutureExt; use futures::{ready, StreamExt}; use object_store::path::Path; use object_store::{MultipartId, ObjectMeta, ObjectStore}; use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::sync::mpsc; +use tokio::task::{JoinHandle, JoinSet}; /// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. /// It is specifically designed for the `object_store` crate's `put` method and sends @@ -237,29 +240,11 @@ pub enum FileWriterMode { pub trait BatchSerializer: Unpin + Send { /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. async fn serialize(&mut self, batch: RecordBatch) -> Result; -} - -/// Checks if any of the passed writers have encountered an error -/// and if so, all writers are aborted. -async fn check_for_errors( - result: Result, - writers: &mut [AbortableWrite], -) -> Result { - match result { - Ok(value) => Ok(value), - Err(e) => { - // Abort all writers before returning the error: - for writer in writers { - let mut abort_future = writer.abort_writer(); - if let Ok(abort_future) = &mut abort_future { - let _ = abort_future.await; - } - // Ignore errors that occur during abortion, - // We do try to abort all writers before returning error. - } - // After aborting writers return original error. - Err(e) - } + /// Duplicates self to support serializing multiple batches in parallel on multiple cores + fn duplicate(&mut self) -> Result> { + Err(DataFusionError::NotImplemented( + "Parallel serialization is not implemented for this file type".into(), + )) } } @@ -315,58 +300,233 @@ pub(crate) async fn create_writer( } } +type WriterType = AbortableWrite>; +type SerializerType = Box; + +/// Serializes a single data stream in parallel and writes to an ObjectStore +/// concurrently. Data order is preserved. In the event of an error, +/// the ObjectStore writer is returned to the caller in addition to an error, +/// so that the caller may handle aborting failed writes. +async fn serialize_rb_stream_to_object_store( + mut data_stream: Pin>, + mut serializer: Box, + mut writer: AbortableWrite>, + unbounded_input: bool, +) -> std::result::Result<(SerializerType, WriterType, u64), (WriterType, DataFusionError)> +{ + let (tx, mut rx) = + mpsc::channel::>>(100); + + let serialize_task = tokio::spawn(async move { + while let Some(maybe_batch) = data_stream.next().await { + match serializer.duplicate() { + Ok(mut serializer_clone) => { + let handle = tokio::spawn(async move { + let batch = maybe_batch?; + let num_rows = batch.num_rows(); + let bytes = serializer_clone.serialize(batch).await?; + Ok((num_rows, bytes)) + }); + tx.send(handle).await.map_err(|_| { + DataFusionError::Internal( + "Unknown error writing to object store".into(), + ) + })?; + if unbounded_input { + tokio::task::yield_now().await; + } + } + Err(_) => { + return Err(DataFusionError::Internal( + "Unknown error writing to object store".into(), + )) + } + } + } + Ok(serializer) + }); + + let mut row_count = 0; + while let Some(handle) = rx.recv().await { + match handle.await { + Ok(Ok((cnt, bytes))) => { + match writer.write_all(&bytes).await { + Ok(_) => (), + Err(e) => { + return Err(( + writer, + DataFusionError::Execution(format!( + "Error writing to object store: {e}" + )), + )) + } + }; + row_count += cnt; + } + Ok(Err(e)) => { + // Return the writer along with the error + return Err((writer, e)); + } + Err(e) => { + // Handle task panic or cancellation + return Err(( + writer, + DataFusionError::Execution(format!( + "Serialization task panicked or was cancelled: {e}" + )), + )); + } + } + } + + let serializer = match serialize_task.await { + Ok(Ok(serializer)) => serializer, + Ok(Err(e)) => return Err((writer, e)), + Err(_) => { + return Err(( + writer, + DataFusionError::Internal("Unknown error writing to object store".into()), + )) + } + }; + Ok((serializer, writer, row_count as u64)) +} + /// Contains the common logic for serializing RecordBatches and /// writing the resulting bytes to an ObjectStore. /// Serialization is assumed to be stateless, i.e. /// each RecordBatch can be serialized without any /// dependency on the RecordBatches before or after. pub(crate) async fn stateless_serialize_and_write_files( - mut data: Vec, - mut serializers: Vec>, - mut writers: Vec>>, + data: Vec, + mut serializers: Vec, + mut writers: Vec, single_file_output: bool, + unbounded_input: bool, ) -> Result { if single_file_output && (serializers.len() != 1 || writers.len() != 1) { return internal_err!("single_file_output is true, but got more than 1 writer!"); } let num_partitions = data.len(); - if !single_file_output && (num_partitions != writers.len()) { + let num_writers = writers.len(); + if !single_file_output && (num_partitions != num_writers) { return internal_err!("single_file_ouput is false, but did not get 1 writer for each output partition!"); } let mut row_count = 0; - // Map errors to DatafusionError. - let err_converter = - |_| DataFusionError::Internal("Unexpected FileSink Error".to_string()); - // TODO parallelize serialization accross partitions and batches within partitions - // see: https://github.com/apache/arrow-datafusion/issues/7079 - for (part_idx, data_stream) in data.iter_mut().enumerate().take(num_partitions) { - let idx = match single_file_output { - false => part_idx, - true => 0, - }; - while let Some(maybe_batch) = data_stream.next().await { - // Write data to files in a round robin fashion: - let serializer = &mut serializers[idx]; - let batch = check_for_errors(maybe_batch, &mut writers).await?; - row_count += batch.num_rows(); - let bytes = - check_for_errors(serializer.serialize(batch).await, &mut writers).await?; - let writer = &mut writers[idx]; - check_for_errors( - writer.write_all(&bytes).await.map_err(err_converter), - &mut writers, - ) - .await?; + // tracks if any writers encountered an error triggering the need to abort + let mut any_errors = false; + // tracks the specific error triggering abort + let mut triggering_error = None; + // tracks if any errors were encountered in the process of aborting writers. + // if true, we may not have a guarentee that all written data was cleaned up. + let mut any_abort_errors = false; + match single_file_output { + false => { + let mut join_set = JoinSet::new(); + for (data_stream, serializer, writer) in data + .into_iter() + .zip(serializers.into_iter()) + .zip(writers.into_iter()) + .map(|((a, b), c)| (a, b, c)) + { + join_set.spawn(async move { + serialize_rb_stream_to_object_store( + data_stream, + serializer, + writer, + unbounded_input, + ) + .await + }); + } + let mut finished_writers = Vec::with_capacity(num_writers); + while let Some(result) = join_set.join_next().await { + match result { + Ok(res) => match res { + Ok((_, writer, cnt)) => { + finished_writers.push(writer); + row_count += cnt; + } + Err((writer, e)) => { + finished_writers.push(writer); + any_errors = true; + triggering_error = Some(e); + } + }, + Err(e) => { + // Don't panic, instead try to clean up as many writers as possible. + // If we hit this code, ownership of a writer was not joined back to + // this thread, so we cannot clean it up (hence any_abort_errors is true) + any_errors = true; + any_abort_errors = true; + triggering_error = Some(DataFusionError::Internal(format!( + "Unexpected join error while serializing file {e}" + ))); + } + } + } + + // Finalize or abort writers as appropriate + for mut writer in finished_writers.into_iter() { + match any_errors { + true => { + let abort_result = writer.abort_writer(); + if abort_result.is_err() { + any_abort_errors = true; + } + } + false => { + writer.shutdown() + .await + .map_err(|_| DataFusionError::Internal("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!".into()))?; + } + } + } + } + true => { + let mut writer = writers.remove(0); + let mut serializer = serializers.remove(0); + let mut cnt; + for data_stream in data.into_iter() { + (serializer, writer, cnt) = match serialize_rb_stream_to_object_store( + data_stream, + serializer, + writer, + unbounded_input, + ) + .await + { + Ok((s, w, c)) => (s, w, c), + Err((w, e)) => { + any_errors = true; + triggering_error = Some(e); + writer = w; + break; + } + }; + row_count += cnt; + } + match any_errors { + true => { + let abort_result = writer.abort_writer(); + if abort_result.is_err() { + any_abort_errors = true; + } + } + false => writer.shutdown().await?, + } } } - // Perform cleanup: - let n_writers = writers.len(); - for idx in 0..n_writers { - check_for_errors( - writers[idx].shutdown().await.map_err(err_converter), - &mut writers, - ) - .await?; + + if any_errors { + match any_abort_errors{ + true => return Err(DataFusionError::Internal("Error encountered during writing to ObjectStore and failed to abort all writers. Partial result may have been written.".into())), + false => match triggering_error { + Some(e) => return Err(e), + None => return Err(DataFusionError::Internal("Unknown Error encountered during writing to ObjectStore. All writers succesfully aborted.".into())) + } + } } - Ok(row_count as u64) + + Ok(row_count) } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 07f0a9dc839c..e36252a99566 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -915,6 +915,7 @@ impl TableProvider for ListingTable { output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), writer_mode, + unbounded_input: self.options().infinite_source, single_file_output: self.options.single_file, overwrite, file_type_writer_options, diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index a788baf80fee..b4892119b785 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -137,8 +137,15 @@ impl TableProviderFactory for ListingTableFactory { let mut statement_options = StatementOptions::from(&cmd.options); // Extract ListingTable specific options if present or set default - // Discard unbounded option if present - statement_options.take_str_option("unbounded"); + let unbounded = if infinite_source { + statement_options.take_str_option("unbounded"); + infinite_source + } else { + statement_options + .take_bool_option("unbounded")? + .unwrap_or(false) + }; + let create_local_path = statement_options .take_bool_option("create_local_path")? .unwrap_or(false); @@ -209,11 +216,11 @@ impl TableProviderFactory for ListingTableFactory { .with_file_extension(file_extension) .with_target_partitions(state.config().target_partitions()) .with_table_partition_cols(table_partition_cols) - .with_infinite_source(infinite_source) .with_file_sort_order(cmd.order_exprs.clone()) .with_insert_mode(insert_mode) .with_single_file(single_file) - .with_write_options(file_type_writer_options); + .with_write_options(file_type_writer_options) + .with_infinite_source(unbounded); let resolved_schema = match provided_schema { None => options.infer_schema(state, &table_path).await?, diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 46556cb3c7b4..4e444c6e5d06 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -95,6 +95,8 @@ pub struct FileSinkConfig { /// regardless of input partitioning. Otherwise, each table path is assumed to be a directory /// to which each output partition is written to its own output file. pub single_file_output: bool, + /// If input is unbounded, tokio tasks need to yield to not block execution forever + pub unbounded_input: bool, /// Controls whether existing data should be overwritten by this sink pub overwrite: bool, /// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 35f5f964813e..def4d59873df 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -586,6 +586,7 @@ impl DefaultPhysicalPlanner { file_groups: vec![], output_schema: Arc::new(schema), table_partition_cols: vec![], + unbounded_input: false, writer_mode: FileWriterMode::PutMultipart, single_file_output: *single_file_output, overwrite: false,