Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize Stateless (CSV/JSON) File Write Serialization #7452

Merged
merged 15 commits into from
Sep 9, 2023

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Aug 30, 2023

Which issue does this PR close?

Part of #7079

Rationale for this change

Serialization of "stateless" file types (where the serialized bytes of each record batch has no dependency on the serialized bytes of any other record batch), can be parallelized efficiently across all available CPU cores for a significant decrease in the time needed to write out the file.

There is a tradeoff between write speed and memory utilization. If the ObjectStore writer cannot keep up with the data being serialized, bytes could accumulate in memory. ObjectStore puts are concurrent but not parallelized so the risk of higher memory usage increases as the number of cores in the system increases. We could potentially bound this increase in memory by throttling serialization tasks when more than X RecordBatches have been serialized but not yet written to the ObjectStore (this is done!).

What changes are included in this PR?

Spawn a tokio task to serialize each each record batch in parallel for "stateless" file types, concurrently write the serialized bytes to an ObjectStore.

Benchmarking

To benchmark the performance difference in this PR, the following script is used.

cd datafusion/benchmarks
./bench.sh data tpch10
use chrono;
use datafusion::prelude::*;
use datafusion_common::DataFusionError;
use object_store::local::LocalFileSystem;
use std::{io::Error, sync::Arc, time::Instant};
use url::Url;

const FILENAME: &str =
    "/home/dev/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";

#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
    let _ctx = SessionContext::new();
    let local = Arc::new(LocalFileSystem::new());
    let local_url = Url::parse("file://local").unwrap();
    _ctx.runtime_env().register_object_store(&local_url, local);

    let _read_options = ParquetReadOptions::default();

    let start = Instant::now();
    let _df = _ctx
        .read_parquet(FILENAME, _read_options)
        .await
        .unwrap()
        //select a few columns with types compatible with write_json method
        .select_columns(&["l_orderkey", "l_partkey", "l_receiptdate"])?
        .repartition(Partitioning::Hash(vec![col("l_orderkey")], 4))?
        .cache()
        .await?;
    let elapsed = Instant::now() - start;
    println!("read parquet to memory took -> {elapsed:?}");

    let start3 = Instant::now();
    _df.clone()
        .write_csv("file://local/home/dev/arrow-datafusion/test_out/")
        .await?;
    let elapsed3 = Instant::now() - start3;
    println!("write as csv to disk took -> {elapsed3:?}");

    let start2 = Instant::now();
    _df.clone()
        .write_json("file://local/home/dev/arrow-datafusion/test_out/")
        .await?;
    let elapsed2 = Instant::now() - start2;
    println!("write as json to disk took -> {elapsed2:?}");

    Ok(())
}

Tables below report results on a 16c/32t development server with 128Gb of memory writing to fast local nvme storage. This is an ideal case, scaling will not be as good on systems with fewer threads, significantly less memory, or slower/remote storage.

Execution Time in Seconds

  main This PR % diff
1 JSON file 220.31 20.76 -91%
1 CSV file 144.31 11.82 -92%
4 JSON files 224.14 19.05 -92%
4 CSV files 147.11 11.46 -92%

Peak Memory Usage in GB

  main This PR % diff
1 JSON file 0.55 3.9 609%
1 CSV file 0.55 0.85 55%
4 JSON files 0.65 3.9 500%
4 CSV files 0.65 1.00 54%

After incorporating the channel method written by @metesynnada to introduce backpressure, memory usage increase is consistently ~200MB is all tests with no noticeable drop in performance.

Note: Peak memory usage figure subtracts the peak memory usage required to cache the DataFrame and not perform any writes.

On a laptop with 15t, I see 28.42s and 42.94s for the 4CSV and 4JSON file tests respectively.

Without the caching step, each write only takes about 1 extra second.

What about Parquet?

Serializing a parquet file is more complex (the parquet writer maintains internal state across record batches).. Serializing two different parquet files in parallel is trivial, but parallelizing serialization of a single large parquet file is more difficult. It may be possible by constructing multiple row groups in parallel. I intend to work on this in another PR.

Are these changes tested?

Yes by existing tests. More tests needed to verify abort behavior may be desired.

Are there any user-facing changes?

No, just faster writes!

@github-actions github-actions bot added the core Core DataFusion crate label Aug 30, 2023
}
false => {
// TODO if we encounter an error during shutdown, delete previously written files?
writer.shutdown()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the trickiest case to ensure the write is atomic. Suppose we have two writers A and B. Writer A could successfully commit and shutdown. Then, before Writer B can complete, a network or hardware fault could prevent Writer B from either finalizing or Writer A from Aborting.

For this to be atomic, we would need some way to simultaneously commit all or none of our multipart writers. I don't think ObjectStores (S3 ect) support a way to do that.

Downstream table providers could make this atomic in practice via an atomic metadata operation, which is I believe how DeltaLake and friends work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that if someone wants atomic commit/rollback they should build that in at a higher level than datafusion -- there isn't much we can do with just the object store API

}
},
Err(_) => {
// Don't panic, instead try to clean up as many writers as possible.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Join errors (perhaps because a thread was killed by the OS?) could also result in non atomic writes, since we would have no way to recover ownership of the writer and abort it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should return the error after aborting the writer, in case an execution error occurs within the plans.

@@ -336,6 +336,7 @@ mod unix_test {

/// It tests the INSERT INTO functionality.
#[tokio::test]
#[ignore]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is deadlocking. I think this has to do with how this test is spawning threads.

Based on the other tests passing, I don't believe this PR has broken anything with FIFO tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mustafasrepo Any idea how to fix this test for this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am currently checking for any potential problems.

Copy link
Contributor

@metesynnada metesynnada Sep 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not the way the threads are started that's the issue, but rather the assumption that if a FIFO file receives 10 rows and the batch size is also 10, it will create a record batch. The current implementation of serialize_rb_stream_to_object_store requires access to all data before it can proceed.

while let Some(maybe_batch) = data_stream.next().await {
    let mut serializer_clone = match serializer.duplicate() {
        Ok(s) => s,
        Err(_) => {
            return Err((
                writer,
                DataFusionError::Internal(
                    "Unknown error writing to object store".into(),
                ),
            ))
        }
    };
    serialize_tasks.push(task::spawn(async move {
        let batch = maybe_batch?;
        let num_rows = batch.num_rows();
        let bytes = serializer_clone.serialize(batch).await?;
        Ok((num_rows, bytes))
    }));
}

@devinjdangelo devinjdangelo marked this pull request as ready for review August 31, 2023 16:46
@devinjdangelo
Copy link
Contributor Author

@alamb Any tips on how to benchmark this more scientifically? Would adding "insert into" and "copy" queries to our existing benchmarking framework be a reasonable idea?

Comment on lines 306 to 377
async fn serialize_rb_stream_to_object_store(
mut data_stream: Pin<Box<dyn RecordBatchStream + Send>>,
mut serializer: Box<dyn BatchSerializer>,
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
) -> std::result::Result<
(
Box<dyn BatchSerializer>,
AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
u64,
),
(
AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
DataFusionError,
),
> {
let mut row_count = 0;
// Not using JoinSet here since we want to ulimately write to ObjectStore preserving file order
let mut serialize_tasks: Vec<JoinHandle<Result<(usize, Bytes), DataFusionError>>> =
Vec::new();
while let Some(maybe_batch) = data_stream.next().await {
let mut serializer_clone = match serializer.duplicate() {
Ok(s) => s,
Err(_) => {
return Err((
writer,
DataFusionError::Internal(
"Unknown error writing to object store".into(),
),
))
}
};
serialize_tasks.push(task::spawn(async move {
let batch = maybe_batch?;
let num_rows = batch.num_rows();
let bytes = serializer_clone.serialize(batch).await?;
Ok((num_rows, bytes))
}));
}
for serialize_result in serialize_tasks {
let result = serialize_result.await;
match result {
Ok(res) => {
let (cnt, bytes) = match res {
Ok(r) => r,
Err(e) => return Err((writer, e)),
};
row_count += cnt;
match writer.write_all(&bytes).await {
Ok(_) => (),
Err(_) => {
return Err((
writer,
DataFusionError::Internal(
"Unknown error writing to object store".into(),
),
))
}
};
}
Err(_) => {
return Err((
writer,
DataFusionError::Internal(
"Unknown error writing to object store".into(),
),
))
}
}
}

Ok((serializer, writer, row_count as u64))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async fn serialize_rb_stream_to_object_store(
mut data_stream: Pin<Box<dyn RecordBatchStream + Send>>,
mut serializer: Box<dyn BatchSerializer>,
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
) -> std::result::Result<
(
Box<dyn BatchSerializer>,
AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
u64,
),
(
AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
DataFusionError,
),
> {
let mut row_count = 0;
// Not using JoinSet here since we want to ulimately write to ObjectStore preserving file order
let mut serialize_tasks: Vec<JoinHandle<Result<(usize, Bytes), DataFusionError>>> =
Vec::new();
while let Some(maybe_batch) = data_stream.next().await {
let mut serializer_clone = match serializer.duplicate() {
Ok(s) => s,
Err(_) => {
return Err((
writer,
DataFusionError::Internal(
"Unknown error writing to object store".into(),
),
))
}
};
serialize_tasks.push(task::spawn(async move {
let batch = maybe_batch?;
let num_rows = batch.num_rows();
let bytes = serializer_clone.serialize(batch).await?;
Ok((num_rows, bytes))
}));
}
for serialize_result in serialize_tasks {
let result = serialize_result.await;
match result {
Ok(res) => {
let (cnt, bytes) = match res {
Ok(r) => r,
Err(e) => return Err((writer, e)),
};
row_count += cnt;
match writer.write_all(&bytes).await {
Ok(_) => (),
Err(_) => {
return Err((
writer,
DataFusionError::Internal(
"Unknown error writing to object store".into(),
),
))
}
};
}
Err(_) => {
return Err((
writer,
DataFusionError::Internal(
"Unknown error writing to object store".into(),
),
))
}
}
}
Ok((serializer, writer, row_count as u64))
}
async fn serialize_rb_stream_to_object_store(
mut data_stream: Pin<Box<dyn RecordBatchStream + Send>>,
mut serializer: Box<dyn BatchSerializer>,
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
) -> std::result::Result<
(
Box<dyn BatchSerializer>,
AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
u64,
),
(
AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
DataFusionError,
),
> {
let (tx, mut rx) =
mpsc::channel::<JoinHandle<Result<(usize, Bytes), DataFusionError>>>(100); // buffer size of 100, adjust as needed
let serialize_task = tokio::spawn(async move {
while let Some(maybe_batch) = data_stream.next().await {
match serializer.duplicate() {
Ok(mut serializer_clone) => {
let handle = tokio::spawn(async move {
let batch = maybe_batch?;
let num_rows = batch.num_rows();
let bytes = serializer_clone.serialize(batch).await?;
Ok((num_rows, bytes))
});
tx.send(handle).await.map_err(|_| {
DataFusionError::Internal(
"Unknown error writing to object store".into(),
)
})?;
yield_now().await;
}
Err(_) => {
return Err(DataFusionError::Internal(
"Unknown error writing to object store".into(),
))
}
}
}
Ok(serializer)
});
let mut row_count = 0;
while let Some(handle) = rx.recv().await {
match handle.await {
Ok(Ok((cnt, bytes))) => {
match writer.write_all(&bytes).await {
Ok(_) => (),
Err(_) => {
return Err((
writer,
DataFusionError::Internal(
"Unknown error writing to object store".into(),
),
))
}
};
row_count += cnt;
}
Ok(Err(e)) => {
// Return the writer along with the error
return Err((writer, e));
}
Err(_) => {
// Handle task panic or cancellation
return Err((
writer,
DataFusionError::Internal(
"Serialization task panicked or was cancelled".into(),
),
));
}
}
}
let serializer = match serialize_task.await {
Ok(Ok(serializer)) => serializer,
Ok(Err(e)) => return Err((writer, e)),
Err(_) => {
return Err((
writer,
DataFusionError::Internal("Unknown error writing to object store".into()),
))
}
};
Ok((serializer, writer, row_count as u64))
}
}

How about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fantastic, thank you @metesynnada ! I tested this and with just a small change it resulted in identical performance with much lower memory usage due to the backpressure introduced by the channel. The FIFO tests is also passing now, so win win!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@metesynnada Actually, I just realized that the fifo test passing is dependent on leaving the yield_now().await call in. That call results in the serialize tasks being run in sequence rather than parallel though, so performance is poor. I'm not quite sure how to get the fifio test to pass without the yield_now().await call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we are facing the same issue at #5278. We could apply the same heuristic to #6290.

Copy link
Contributor

@metesynnada metesynnada Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, is your profiling runtime multi-threaded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the issue links. I tried calling yield_now().await every N iterations, but the FIFO insert test only passes when N==1 (i.e. yield_now().await on every iteration).

I just pushed up a possible option of calling yield_now() if the input is unbounded, otherwise skipping it. That allows bounded tables to benefit fully from the parallelization but still allowing fifo tests to pass as they currently do.

The fifo test is not a multithreaded runtime, but when I benchmark performance I am running in a multithreaded runtime, yes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will check the entire pull request tomorrow. Thank you for your hard work. I am also pleased that the new mechanism has contributed to the performance.

devinjdangelo and others added 3 commits September 5, 2023 11:31
Co-authored-by: Metehan Yıldırım <100111937+metesynnada@users.noreply.github.com>
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @devinjdangelo -- I plan to study this PR carefully tomorrow

}
false => {
// TODO if we encounter an error during shutdown, delete previously written files?
writer.shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that if someone wants atomic commit/rollback they should build that in at a higher level than datafusion -- there isn't much we can do with just the object store API

Copy link
Contributor

@metesynnada metesynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR looks great overall. Once the outstanding issues are resolved, I can give it a final review.

// tracks if any errors were encountered in the process of aborting writers.
// if true, we may not have a guarentee that all written data was cleaned up.
let mut any_abort_errors = false;
match single_file_output {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be if single_file_output { ... } else { ... }


// Finalize or abort writers as appropriate
for mut writer in finished_writers.into_iter() {
match any_errors {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be if any_errors { ... } else { ... }

};
Ok((serializer, writer, row_count as u64))
}

/// Contains the common logic for serializing RecordBatches and
/// writing the resulting bytes to an ObjectStore.
/// Serialization is assumed to be stateless, i.e.
/// each RecordBatch can be serialized without any
/// dependency on the RecordBatches before or after.
pub(crate) async fn stateless_serialize_and_write_files(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The blocks inside the match serialize_rb_stream_to_object_store can be refactored into separate functions for better readability and maintainability.

pub(crate) async fn stateless_serialize_and_write_files(
    data: Vec<SendableRecordBatchStream>,
    mut serializers: Vec<Box<dyn BatchSerializer>>,
    mut writers: Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>,
    single_file_output: bool,
    unbounded_input: bool,
) -> Result<u64> {
    if single_file_output {
        if serializers.len() != 1 || writers.len() != 1 {
            return internal_err!("single_file_output is true, but got more than 1 writer!");
        }
        return serialize_single_file(data, serializers.remove(0), writers.remove(0), unbounded_input).await;
    } else {
        if data.len() != writers.len() {
            return internal_err!("Did not get 1 writer for each output partition!");
        }
        return serialize_multiple_files(data, serializers, writers, unbounded_input).await;
    }
}

mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
unbounded_input: bool,
) -> std::result::Result<
(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function signature is quite complex with nested types. Using type aliases can simplify the signature and make it more readable. Like

type WriterType = AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>;
type ResultTuple = (Box<dyn BatchSerializer>, WriterType, u64);
type ErrorTuple = (WriterType, DataFusionError); 

Ok((num_rows, bytes))
});
tx.send(handle).await.map_err(|_| {
DataFusionError::Internal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Define the return error as a closure at the beginning of the method to make it common.

}
},
Err(_) => {
// Don't panic, instead try to clean up as many writers as possible.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should return the error after aborting the writer, in case an execution error occurs within the plans.

@@ -137,8 +137,15 @@ impl TableProviderFactory for ListingTableFactory {
let mut statement_options = StatementOptions::from(&cmd.options);

// Extract ListingTable specific options if present or set default
// Discard unbounded option if present
statement_options.take_str_option("unbounded");
let unbounded = if infinite_source {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove this logic since we support unbounded tables with CREATE UNBOUNDED EXTERNAL TABLE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That syntax is supported, but the test for FIFO insert passes unbounded as a string option. The logic here was added to handle either case. Create unbounded syntax takes precedence, but if it is not present we check for OPTION (unbounded true).

https://github.com/apache/arrow-datafusion/blob/1dd0e988ceb6ecc1df4053fd7d065c15c6e8b431/datafusion/core/tests/fifo.rs#L399

Copy link
Contributor

@metesynnada metesynnada Sep 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I think it is a leftover. I guess we can change the test into Create Unbounded External if I am not missing anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is OK if both methods are supported. That would provide backwards compatibility for anyone who used this syntax in past versions.

@@ -213,7 +220,8 @@ impl TableProviderFactory for ListingTableFactory {
.with_file_sort_order(cmd.order_exprs.clone())
.with_insert_mode(insert_mode)
.with_single_file(single_file)
.with_write_options(file_type_writer_options);
.with_write_options(file_type_writer_options)
.with_infinite_source(unbounded);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the line with .with_infinite_source(unbounded); as it duplicates the logic above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the liberty of doing so in e6f1b2f

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty compelling @devinjdangelo 👍 Thank you so much -- this is a great feature for DataFusion. I can't wait to see parallelized parquet writes :)

I ran some performance experiments locally and it shows the same performance you report in the PR description.

I think there are a few suggestions from @metesynnada that would make the code in this PR easier to read, but I think they could potentially be done as follow ons.

Performance Results (at least 4x faster)

This branch:

❯ copy 'traces' to 'foo.csv';
+---------+
| count   |
+---------+
| 5185717 |
+---------+
1 row in set. Query took 2.882 seconds.

Main

❯ copy 'traces' to 'foo.csv';
+---------+
| count   |
+---------+
| 5185717 |
+---------+
1 row in set. Query took 11.532 seconds.

datafusion/core/src/datasource/file_format/write.rs Outdated Show resolved Hide resolved
@@ -213,7 +220,8 @@ impl TableProviderFactory for ListingTableFactory {
.with_file_sort_order(cmd.order_exprs.clone())
.with_insert_mode(insert_mode)
.with_single_file(single_file)
.with_write_options(file_type_writer_options);
.with_write_options(file_type_writer_options)
.with_infinite_source(unbounded);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the liberty of doing so in e6f1b2f

/// concurrently. Data order is preserved. In the event of an error,
/// the ObjectStore writer is returned to the caller in addition to an error,
/// so that the caller may handle aborting failed writes.
async fn serialize_rb_stream_to_object_store(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some ideas how to simplify this code, which I will try out shortly, but I also think it can be merged like this too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to rewrite this as a futures::stream computation using buffered -- but I got stuck on some "higher-ranked lifetime error" so I think this is about as good as it is going to get

@alamb
Copy link
Contributor

alamb commented Sep 8, 2023

From my point of view this PR is ready to go -- There are still some suggestion from @metesynnada, but I am not sure if they feel any are required prior to merge.

/// concurrently. Data order is preserved. In the event of an error,
/// the ObjectStore writer is returned to the caller in addition to an error,
/// so that the caller may handle aborting failed writes.
async fn serialize_rb_stream_to_object_store(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to rewrite this as a futures::stream computation using buffered -- but I got stuck on some "higher-ranked lifetime error" so I think this is about as good as it is going to get

datafusion/core/src/datasource/file_format/write.rs Outdated Show resolved Hide resolved
datafusion/core/src/datasource/file_format/write.rs Outdated Show resolved Hide resolved
@alamb
Copy link
Contributor

alamb commented Sep 9, 2023

@alamb Any tips on how to benchmark this more scientifically? Would adding "insert into" and "copy" queries to our existing benchmarking framework be a reasonable idea?

It would be great to add "write" / "copy" queries into our benchmarks. Since DataFusion only recently started supporting such statements we don't have same infrastructure for benchmarking that we do for queries

@alamb
Copy link
Contributor

alamb commented Sep 9, 2023

I think all comments have now been addressed. Thank you very much @devinjdangelo and @metesynnada -- this is really cool to see. 🚀

@alamb alamb merged commit abea893 into apache:main Sep 9, 2023
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants