Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,13 @@ config_namespace! {
/// Sets bloom filter number of distinct values. If NULL, uses
/// default parquet writer setting
pub bloom_filter_ndv: Option<u64>, default = None

/// Controls whether DataFusion will attempt to speed up writing
/// large parquet files by first writing multiple smaller files
/// and then stitching them together into a single large file.
/// This will result in faster write speeds, but higher memory usage.
pub allow_single_file_parallelism: bool, default = true

}
}

Expand Down
272 changes: 238 additions & 34 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

//! Parquet format abstractions

use parquet::column::writer::ColumnCloseResult;
use parquet::file::writer::SerializedFileWriter;
use rand::distributions::DistString;
use tokio::io::{AsyncWriteExt, AsyncWrite};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use std::any::Any;
use std::fmt;
use std::fmt::Debug;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
use tokio::task::JoinSet;
use std::sync::mpsc::Sender;
use tokio::task::{JoinHandle, JoinSet};

use arrow::datatypes::SchemaRef;
use arrow::datatypes::{Fields, Schema};
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, FileType};
use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, FileType, FileCompressionType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
use futures::{StreamExt, TryStreamExt};
Expand All @@ -41,7 +48,7 @@ use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics as ParquetStatistics;
use rand::distributions::Alphanumeric;

use super::write::FileWriterMode;
use super::write::{FileWriterMode, AbortableWrite, create_writer};
use super::FileFormat;
use super::FileScanConfig;
use crate::arrow::array::{
Expand Down Expand Up @@ -605,8 +612,9 @@ impl ParquetSink {
Self { config }
}

// Create a write for parquet files
async fn create_writer(
/// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore
/// AsyncArrowWriters are used when individual parquet file serialization is not parallelized
async fn create_async_arrow_writer(
&self,
file_meta: FileMeta,
object_store: Arc<dyn ObjectStore>,
Expand Down Expand Up @@ -639,26 +647,10 @@ impl ParquetSink {
}
}
}
}

#[async_trait]
impl DataSink for ParquetSink {
async fn write_all(
&self,
mut data: Vec<SendableRecordBatchStream>,
context: &Arc<TaskContext>,
) -> Result<u64> {
let num_partitions = data.len();
let parquet_props = self
.config
.file_type_writer_options
.try_into_parquet()?
.writer_options();

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;

/// Creates an AsyncArrowWriter for each partition to be written out
/// AsyncArrowWriters are used when individual parquet file serialization is not parallelized
async fn create_all_async_arrow_writers(&self, num_partitions: usize, parquet_props: &WriterProperties, object_store: Arc<dyn ObjectStore>) -> Result<Vec<AsyncArrowWriter<Box<dyn tokio::io::AsyncWrite + std::marker::Send + Unpin>>>>{
// Construct writer for each file group
let mut writers = vec![];
match self.config.writer_mode {
Expand Down Expand Up @@ -689,7 +681,7 @@ impl DataSink for ParquetSink {
e_tag: None,
};
let writer = self
.create_writer(
.create_async_arrow_writer(
object_meta.into(),
object_store.clone(),
parquet_props.clone(),
Expand All @@ -707,7 +699,7 @@ impl DataSink for ParquetSink {
e_tag: None,
};
let writer = self
.create_writer(
.create_async_arrow_writer(
object_meta.into(),
object_store.clone(),
parquet_props.clone(),
Expand All @@ -719,10 +711,70 @@ impl DataSink for ParquetSink {
}
}

Ok(writers)
}

/// Creates an object store writer for each output partition
/// This is used when parallelizing individual parquet file writes.
async fn create_object_store_writers(&self, num_partitions: usize, object_store: Arc<dyn ObjectStore>) -> Result<Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>>{

let mut writers = Vec::new();

for _ in 0..num_partitions{
let file_path = self.config.table_paths[0].prefix();
let object_meta = ObjectMeta {
location: file_path.clone(),
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
writers.push(
create_writer(FileWriterMode::PutMultipart, FileCompressionType::UNCOMPRESSED, object_meta.into(), object_store.clone()).await?
);

}

Ok(writers)

}
}


#[async_trait]
impl DataSink for ParquetSink {
async fn write_all(
&self,
mut data: Vec<SendableRecordBatchStream>,
context: &Arc<TaskContext>,
) -> Result<u64> {
let num_partitions = data.len();
let parquet_props = self
.config
.file_type_writer_options
.try_into_parquet()?
.writer_options();

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;



let mut row_count = 0;

let allow_single_file_parallelism = context
.session_config()
.options()
.execution
.parquet
.allow_single_file_parallelism;




match self.config.single_file_output {
false => {
let mut writers = self.create_all_async_arrow_writers(num_partitions, parquet_props, object_store.clone()).await?;
let mut join_set: JoinSet<Result<usize, DataFusionError>> =
JoinSet::new();
for (mut data_stream, mut writer) in
Expand Down Expand Up @@ -754,23 +806,175 @@ impl DataSink for ParquetSink {
}
}
true => {
let mut writer = writers.remove(0);
for data_stream in data.iter_mut() {
while let Some(batch) = data_stream.next().await.transpose()? {
row_count += batch.num_rows();
// TODO cleanup all multipart writes when any encounters an error
writer.write(&batch).await?;

if !allow_single_file_parallelism || data.len()<=1 {
let mut writer = self.create_all_async_arrow_writers(num_partitions, parquet_props, object_store.clone()).await?.remove(0);
for data_stream in data.iter_mut() {
while let Some(batch) = data_stream.next().await.transpose()? {
row_count += batch.num_rows();
writer.write(&batch).await?;
}
}

writer.close().await?;
} else {
let mut object_store_writer = self.create_object_store_writers(1, object_store).await?.remove(0);
let parallelism = data.len();
let mut join_handles: Vec<
JoinHandle<Result<(Vec<u8>, usize), DataFusionError>>,
> = Vec::with_capacity(parallelism);
for _ in 0..parallelism {
let buffer: Vec<u8> = Vec::new();
let mut writer =
parquet::arrow::arrow_writer::ArrowWriter::try_new(
buffer,
self.config.output_schema.clone(),
Some(parquet_props.clone()),
)?;
let mut data_stream = data.remove(0);
join_handles.push(tokio::spawn(async move {
let mut inner_row_count = 0;
while let Some(batch) =
data_stream.next().await.transpose()?
{
inner_row_count += batch.num_rows();
writer.write(&batch)?;
}
let out = writer.into_inner()?;
Ok((out, inner_row_count))
}))
}
}

writer.close().await?;
let mut writer = None;
let endpoints: (UnboundedSender<Vec<u8>>, UnboundedReceiver<Vec<u8>>) = tokio::sync::mpsc::unbounded_channel();
let (mut tx, mut rx) = endpoints;
let writer_join_handle: JoinHandle<Result<AbortableWrite<Box<dyn tokio::io::AsyncWrite + std::marker::Send + Unpin>>, DataFusionError>> = tokio::task::spawn(async move {
while let Some(data) = rx.recv().await{
object_store_writer.write_all(data.as_slice()).await?;
}
Ok(object_store_writer)
});
let merged_buff = SharedBuffer::new(1048576);
for handle in join_handles{
let join_result = handle.await;
match join_result{
Ok(result) => {
let (out, num_rows) = result?;
let reader = bytes::Bytes::from(out);
row_count += num_rows;
//let reader = File::open(buffer)?;
let metadata = parquet::file::footer::parse_metadata(&reader)?;
let schema = metadata.file_metadata().schema();
writer = match writer{
Some(writer) => Some(writer),
None => {
Some(SerializedFileWriter::new(merged_buff.clone(), Arc::new(schema.clone()), Arc::new(parquet_props.clone()))?)
}
};

match &mut writer{
Some(w) => {
// Note: cannot use .await within this loop as RowGroupMetaData is not Send
// Instead, use a non-blocking channel to send bytes to separate worker
// which will write to ObjectStore.
for rg in metadata.row_groups() {
let mut rg_out = w.next_row_group()?;
for column in rg.columns() {
let result = ColumnCloseResult {
bytes_written: column.compressed_size() as _,
rows_written: rg.num_rows() as _,
metadata: column.clone(),
bloom_filter: None,
column_index: None,
offset_index: None,
};
rg_out.append_column(&reader, result)?;
let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
if buff_to_flush.len() > 1024000{
let bytes: Vec<u8> = buff_to_flush.drain(..).collect();
tx.send(bytes).map_err(|_| DataFusionError::Execution("Failed to send bytes to ObjectStore writer".into()))?;

}
}
rg_out.close()?;
let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
if buff_to_flush.len() > 10240{
let bytes: Vec<u8> = buff_to_flush.drain(..).collect();
tx.send(bytes).map_err(|_| DataFusionError::Execution("Failed to send bytes to ObjectStore writer".into()))?;
}
}
},
None => unreachable!("Parquet writer should always be initialized in first iteration of loop!")
}
}
Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
unreachable!();
}
}
}
}
let inner_writer = writer.unwrap().into_inner()?;
let mut final_buff = inner_writer.buffer.try_lock().unwrap();
//tx.send(final_bytes).map_err(|_| DataFusionError::Execution("Failed to send bytes to ObjectStore writer".into()))?;

// Explicitly drop tx to signal to rx we are done sending data
drop(tx);

let mut object_store_writer = match writer_join_handle.await{
Ok(r) => r?,
Err(e) => {
if e.is_panic(){
std::panic::resume_unwind(e.into_panic())
} else{
unreachable!()
}
}
};
object_store_writer.write_all(final_buff.as_slice()).await?;
object_store_writer.shutdown().await?;
println!("done!");
}
}
}

Ok(row_count as u64)
}
}

/// A buffer with interior mutability shared by the SerializedFileWriter and
/// ObjectStore writer
#[derive(Clone)]
struct SharedBuffer {
/// The inner buffer for reading and writing
///
/// The lock is used to obtain internal mutability, so no worry about the
/// lock contention.
buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
}

impl SharedBuffer {
pub fn new(capacity: usize) -> Self {
Self {
buffer: Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
}
}
}

impl Write for SharedBuffer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut buffer = self.buffer.try_lock().unwrap();
Write::write(&mut *buffer, buf)
}

fn flush(&mut self) -> std::io::Result<()> {
let mut buffer = self.buffer.try_lock().unwrap();
Write::flush(&mut *buffer)
}
}

#[cfg(test)]
pub(crate) mod test_util {
use super::*;
Expand Down