Skip to content

Commit 0b9b30a

Browse files
Add write_csv to DataFrame (#1922)
* Add write_csv to DataFrame * Cleanup * Update write_csv signature
1 parent dd94fcf commit 0b9b30a

File tree

5 files changed

+148
-35
lines changed

5 files changed

+148
-35
lines changed

datafusion/src/dataframe.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,4 +405,7 @@ pub trait DataFrame: Send + Sync {
405405
/// # }
406406
/// ```
407407
fn except(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn DataFrame>>;
408+
409+
/// Write a `DataFrame` to a CSV file.
410+
async fn write_csv(&self, path: &str) -> Result<()>;
408411
}

datafusion/src/execution/context.rs

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use std::{fs, path::PathBuf};
4949
use futures::{StreamExt, TryStreamExt};
5050
use tokio::task::{self, JoinHandle};
5151

52-
use arrow::{csv, datatypes::SchemaRef};
52+
use arrow::datatypes::SchemaRef;
5353

5454
use crate::catalog::{
5555
catalog::{CatalogProvider, MemoryCatalogProvider},
@@ -80,6 +80,7 @@ use crate::physical_optimizer::repartition::Repartition;
8080

8181
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
8282
use crate::logical_plan::plan::Explain;
83+
use crate::physical_plan::file_format::plan_to_csv;
8384
use crate::physical_plan::planner::DefaultPhysicalPlanner;
8485
use crate::physical_plan::udf::ScalarUDF;
8586
use crate::physical_plan::ExecutionPlan;
@@ -717,37 +718,7 @@ impl ExecutionContext {
717718
plan: Arc<dyn ExecutionPlan>,
718719
path: impl AsRef<str>,
719720
) -> Result<()> {
720-
let path = path.as_ref();
721-
// create directory to contain the CSV files (one per partition)
722-
let fs_path = Path::new(path);
723-
let runtime = self.runtime_env();
724-
match fs::create_dir(fs_path) {
725-
Ok(()) => {
726-
let mut tasks = vec![];
727-
for i in 0..plan.output_partitioning().partition_count() {
728-
let plan = plan.clone();
729-
let filename = format!("part-{}.csv", i);
730-
let path = fs_path.join(&filename);
731-
let file = fs::File::create(path)?;
732-
let mut writer = csv::Writer::new(file);
733-
let stream = plan.execute(i, runtime.clone()).await?;
734-
let handle: JoinHandle<Result<()>> = task::spawn(async move {
735-
stream
736-
.map(|batch| writer.write(&batch?))
737-
.try_collect()
738-
.await
739-
.map_err(DataFusionError::from)
740-
});
741-
tasks.push(handle);
742-
}
743-
futures::future::join_all(tasks).await;
744-
Ok(())
745-
}
746-
Err(e) => Err(DataFusionError::Execution(format!(
747-
"Could not create directory {}: {:?}",
748-
path, e
749-
))),
750-
}
721+
plan_to_csv(self, plan, path).await
751722
}
752723

753724
/// Executes a query and writes the results to a partitioned Parquet file.

datafusion/src/execution/dataframe_impl.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::{
3939
use crate::arrow::util::pretty;
4040
use crate::datasource::TableProvider;
4141
use crate::datasource::TableType;
42+
use crate::physical_plan::file_format::plan_to_csv;
4243
use crate::physical_plan::{
4344
execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream,
4445
};
@@ -313,6 +314,13 @@ impl DataFrame for DataFrameImpl {
313314
&LogicalPlanBuilder::except(left_plan, right_plan, true)?,
314315
)))
315316
}
317+
318+
async fn write_csv(&self, path: &str) -> Result<()> {
319+
let plan = self.create_physical_plan().await?;
320+
let state = self.ctx_state.lock().clone();
321+
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
322+
plan_to_csv(&ctx, plan, path).await
323+
}
316324
}
317325

318326
#[cfg(test)]

datafusion/src/physical_plan/file_format/csv.rs

Lines changed: 133 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,22 @@
1818
//! Execution plan for reading CSV files
1919
2020
use crate::error::{DataFusionError, Result};
21+
use crate::execution::context::ExecutionContext;
2122
use crate::physical_plan::expressions::PhysicalSortExpr;
2223
use crate::physical_plan::{
2324
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
2425
};
2526

27+
use crate::execution::runtime_env::RuntimeEnv;
2628
use arrow::csv;
2729
use arrow::datatypes::SchemaRef;
30+
use async_trait::async_trait;
31+
use futures::{StreamExt, TryStreamExt};
2832
use std::any::Any;
33+
use std::fs;
34+
use std::path::Path;
2935
use std::sync::Arc;
30-
31-
use crate::execution::runtime_env::RuntimeEnv;
32-
use async_trait::async_trait;
36+
use tokio::task::{self, JoinHandle};
3337

3438
use super::file_stream::{BatchIter, FileStream};
3539
use super::FileScanConfig;
@@ -176,16 +180,59 @@ impl ExecutionPlan for CsvExec {
176180
}
177181
}
178182

183+
pub async fn plan_to_csv(
184+
context: &ExecutionContext,
185+
plan: Arc<dyn ExecutionPlan>,
186+
path: impl AsRef<str>,
187+
) -> Result<()> {
188+
let path = path.as_ref();
189+
// create directory to contain the CSV files (one per partition)
190+
let fs_path = Path::new(path);
191+
let runtime = context.runtime_env();
192+
match fs::create_dir(fs_path) {
193+
Ok(()) => {
194+
let mut tasks = vec![];
195+
for i in 0..plan.output_partitioning().partition_count() {
196+
let plan = plan.clone();
197+
let filename = format!("part-{}.csv", i);
198+
let path = fs_path.join(&filename);
199+
let file = fs::File::create(path)?;
200+
let mut writer = csv::Writer::new(file);
201+
let stream = plan.execute(i, runtime.clone()).await?;
202+
let handle: JoinHandle<Result<()>> = task::spawn(async move {
203+
stream
204+
.map(|batch| writer.write(&batch?))
205+
.try_collect()
206+
.await
207+
.map_err(DataFusionError::from)
208+
});
209+
tasks.push(handle);
210+
}
211+
futures::future::join_all(tasks).await;
212+
Ok(())
213+
}
214+
Err(e) => Err(DataFusionError::Execution(format!(
215+
"Could not create directory {}: {:?}",
216+
path, e
217+
))),
218+
}
219+
}
220+
179221
#[cfg(test)]
180222
mod tests {
181223
use super::*;
224+
use crate::prelude::*;
182225
use crate::test_util::aggr_test_schema_with_missing_col;
183226
use crate::{
184227
datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem},
185228
scalar::ScalarValue,
186229
test_util::aggr_test_schema,
187230
};
231+
use arrow::datatypes::*;
188232
use futures::StreamExt;
233+
use std::fs::File;
234+
use std::io::Write;
235+
use tempfile::TempDir;
189236

190237
#[tokio::test]
191238
async fn csv_exec_with_projection() -> Result<()> {
@@ -376,4 +423,87 @@ mod tests {
376423
crate::assert_batches_eq!(expected, &[batch.slice(0, 5)]);
377424
Ok(())
378425
}
426+
427+
/// Generate CSV partitions within the supplied directory
428+
fn populate_csv_partitions(
429+
tmp_dir: &TempDir,
430+
partition_count: usize,
431+
file_extension: &str,
432+
) -> Result<SchemaRef> {
433+
// define schema for data source (csv file)
434+
let schema = Arc::new(Schema::new(vec![
435+
Field::new("c1", DataType::UInt32, false),
436+
Field::new("c2", DataType::UInt64, false),
437+
Field::new("c3", DataType::Boolean, false),
438+
]));
439+
440+
// generate a partitioned file
441+
for partition in 0..partition_count {
442+
let filename = format!("partition-{}.{}", partition, file_extension);
443+
let file_path = tmp_dir.path().join(&filename);
444+
let mut file = File::create(file_path)?;
445+
446+
// generate some data
447+
for i in 0..=10 {
448+
let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
449+
file.write_all(data.as_bytes())?;
450+
}
451+
}
452+
453+
Ok(schema)
454+
}
455+
456+
#[tokio::test]
457+
async fn write_csv_results() -> Result<()> {
458+
// create partitioned input file and context
459+
let tmp_dir = TempDir::new()?;
460+
let mut ctx = ExecutionContext::with_config(
461+
ExecutionConfig::new().with_target_partitions(8),
462+
);
463+
464+
let schema = populate_csv_partitions(&tmp_dir, 8, ".csv")?;
465+
466+
// register csv file with the execution context
467+
ctx.register_csv(
468+
"test",
469+
tmp_dir.path().to_str().unwrap(),
470+
CsvReadOptions::new().schema(&schema),
471+
)
472+
.await?;
473+
474+
// execute a simple query and write the results to CSV
475+
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
476+
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
477+
df.write_csv(&out_dir).await?;
478+
479+
// create a new context and verify that the results were saved to a partitioned csv file
480+
let mut ctx = ExecutionContext::new();
481+
482+
let schema = Arc::new(Schema::new(vec![
483+
Field::new("c1", DataType::UInt32, false),
484+
Field::new("c2", DataType::UInt64, false),
485+
]));
486+
487+
// register each partition as well as the top level dir
488+
let csv_read_option = CsvReadOptions::new().schema(&schema);
489+
ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option)
490+
.await?;
491+
ctx.register_csv("allparts", &out_dir, csv_read_option)
492+
.await?;
493+
494+
let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?;
495+
let allparts = ctx
496+
.sql("SELECT c1, c2 FROM allparts")
497+
.await?
498+
.collect()
499+
.await?;
500+
501+
let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();
502+
503+
assert_eq!(part0[0].schema(), allparts[0].schema());
504+
505+
assert_eq!(allparts_count, 80);
506+
507+
Ok(())
508+
}
379509
}

datafusion/src/physical_plan/file_format/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use arrow::{
3232
record_batch::RecordBatch,
3333
};
3434
pub use avro::AvroExec;
35+
pub(crate) use csv::plan_to_csv;
3536
pub use csv::CsvExec;
3637
pub use json::NdJsonExec;
3738

0 commit comments

Comments
 (0)