Skip to content

Commit b276d47

Browse files
Add test for DataFrame::write_table (#8531)
* add test for DataFrame::write_table * remove duplicate let df=... * remove println!
1 parent e1a9177 commit b276d47

File tree

1 file changed

+92
-3
lines changed
  • datafusion/core/src/datasource/physical_plan/parquet

1 file changed

+92
-3
lines changed

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 92 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -752,7 +752,7 @@ mod tests {
752752
use crate::datasource::file_format::options::CsvReadOptions;
753753
use crate::datasource::file_format::parquet::test_util::store_parquet;
754754
use crate::datasource::file_format::test_util::scan_format;
755-
use crate::datasource::listing::{FileRange, PartitionedFile};
755+
use crate::datasource::listing::{FileRange, ListingOptions, PartitionedFile};
756756
use crate::datasource::object_store::ObjectStoreUrl;
757757
use crate::execution::context::SessionState;
758758
use crate::physical_plan::displayable;
@@ -772,8 +772,8 @@ mod tests {
772772
};
773773
use arrow_array::Date64Array;
774774
use chrono::{TimeZone, Utc};
775-
use datafusion_common::ScalarValue;
776775
use datafusion_common::{assert_contains, ToDFSchema};
776+
use datafusion_common::{FileType, GetExt, ScalarValue};
777777
use datafusion_expr::{col, lit, when, Expr};
778778
use datafusion_physical_expr::create_physical_expr;
779779
use datafusion_physical_expr::execution_props::ExecutionProps;
@@ -1941,6 +1941,96 @@ mod tests {
19411941
Ok(schema)
19421942
}
19431943

1944+
#[tokio::test]
1945+
async fn write_table_results() -> Result<()> {
1946+
// create partitioned input file and context
1947+
let tmp_dir = TempDir::new()?;
1948+
// let mut ctx = create_ctx(&tmp_dir, 4).await?;
1949+
let ctx = SessionContext::new_with_config(
1950+
SessionConfig::new().with_target_partitions(8),
1951+
);
1952+
let schema = populate_csv_partitions(&tmp_dir, 4, ".csv")?;
1953+
// register csv file with the execution context
1954+
ctx.register_csv(
1955+
"test",
1956+
tmp_dir.path().to_str().unwrap(),
1957+
CsvReadOptions::new().schema(&schema),
1958+
)
1959+
.await?;
1960+
1961+
// register a local file system object store for /tmp directory
1962+
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
1963+
let local_url = Url::parse("file://local").unwrap();
1964+
ctx.runtime_env().register_object_store(&local_url, local);
1965+
1966+
// Configure listing options
1967+
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
1968+
let listing_options = ListingOptions::new(Arc::new(file_format))
1969+
.with_file_extension(FileType::PARQUET.get_ext());
1970+
1971+
// execute a simple query and write the results to parquet
1972+
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
1973+
std::fs::create_dir(&out_dir).unwrap();
1974+
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
1975+
let schema: Schema = df.schema().into();
1976+
// Register a listing table - this will use all files in the directory as data sources
1977+
// for the query
1978+
ctx.register_listing_table(
1979+
"my_table",
1980+
&out_dir,
1981+
listing_options,
1982+
Some(Arc::new(schema)),
1983+
None,
1984+
)
1985+
.await
1986+
.unwrap();
1987+
df.write_table("my_table", DataFrameWriteOptions::new())
1988+
.await?;
1989+
1990+
// create a new context and verify that the results were saved to a partitioned parquet file
1991+
let ctx = SessionContext::new();
1992+
1993+
// get write_id
1994+
let mut paths = fs::read_dir(&out_dir).unwrap();
1995+
let path = paths.next();
1996+
let name = path
1997+
.unwrap()?
1998+
.path()
1999+
.file_name()
2000+
.expect("Should be a file name")
2001+
.to_str()
2002+
.expect("Should be a str")
2003+
.to_owned();
2004+
let (parsed_id, _) = name.split_once('_').expect("File should contain _ !");
2005+
let write_id = parsed_id.to_owned();
2006+
2007+
// register each partition as well as the top level dir
2008+
ctx.register_parquet(
2009+
"part0",
2010+
&format!("{out_dir}/{write_id}_0.parquet"),
2011+
ParquetReadOptions::default(),
2012+
)
2013+
.await?;
2014+
2015+
ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default())
2016+
.await?;
2017+
2018+
let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?;
2019+
let allparts = ctx
2020+
.sql("SELECT c1, c2 FROM allparts")
2021+
.await?
2022+
.collect()
2023+
.await?;
2024+
2025+
let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();
2026+
2027+
assert_eq!(part0[0].schema(), allparts[0].schema());
2028+
2029+
assert_eq!(allparts_count, 40);
2030+
2031+
Ok(())
2032+
}
2033+
19442034
#[tokio::test]
19452035
async fn write_parquet_results() -> Result<()> {
19462036
// create partitioned input file and context
@@ -1985,7 +2075,6 @@ mod tests {
19852075
.to_str()
19862076
.expect("Should be a str")
19872077
.to_owned();
1988-
println!("{name}");
19892078
let (parsed_id, _) = name.split_once('_').expect("File should contain _ !");
19902079
let write_id = parsed_id.to_owned();
19912080

0 commit comments

Comments
 (0)