From 610b11dc5cb84ac51dedf281c65aa9bdf3e23d00 Mon Sep 17 00:00:00 2001 From: allinux Date: Sat, 7 Sep 2024 19:20:22 +0900 Subject: [PATCH] feat: Added DataFrameWriteOptions option when writing as csv, json, parquet. --- python/datafusion/dataframe.py | 35 +++++++++++++++++--- src/dataframe.rs | 58 ++++++++++++++++++++++++++++++---- 2 files changed, 82 insertions(+), 11 deletions(-) diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py index 46b8fa1b..628e4d20 100644 --- a/python/datafusion/dataframe.py +++ b/python/datafusion/dataframe.py @@ -409,20 +409,33 @@ def except_all(self, other: DataFrame) -> DataFrame: """ return DataFrame(self.df.except_all(other.df)) - def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None: + def write_csv( + self, + path: str | pathlib.Path, + with_header: bool = False, + overwrite: bool = False, + single_file_output: bool = False, + partition_by: Optional[List[str]] = None, + ) -> None: """Execute the :py:class:`DataFrame` and write the results to a CSV file. Args: path: Path of the CSV file to write. with_header: If true, output the CSV header row. + write_options_overwrite: Controls if existing data should be overwritten + write_options_single_file_output: Controls if all partitions should be coalesced into a single output file. Generally will have slower performance when set to true. + write_options_partition_by: Sets which columns should be used for hive-style partitioned writes by name. """ - self.df.write_csv(str(path), with_header) + self.df.write_csv(str(path), with_header, write_options_overwrite, write_options_single_file_output, write_options_partition_by) def write_parquet( self, path: str | pathlib.Path, compression: str = "uncompressed", compression_level: int | None = None, + overwrite: bool = False, + single_file_output: bool = False, + partition_by: Optional[List[str]] = None, ) -> None: """Execute the :py:class:`DataFrame` and write the results to a Parquet file. @@ -430,16 +443,28 @@ def write_parquet( path: Path of the Parquet file to write. compression: Compression type to use. compression_level: Compression level to use. + write_options_overwrite: Controls if existing data should be overwritten + write_options_single_file_output: Controls if all partitions should be coalesced into a single output file. Generally will have slower performance when set to true. + write_options_partition_by: Sets which columns should be used for hive-style partitioned writes by name. """ - self.df.write_parquet(str(path), compression, compression_level) + self.df.write_parquet(str(path), compression, compression_level, write_options_overwrite, write_options_single_file_output, write_options_partition_by) - def write_json(self, path: str | pathlib.Path) -> None: + def write_json( + self, + path: str | pathlib.Path, + overwrite: bool = False, + single_file_output: bool = False, + partition_by: Optional[List[str]] = None, + ) -> None: """Execute the :py:class:`DataFrame` and write the results to a JSON file. Args: path: Path of the JSON file to write. + write_options_overwrite: Controls if existing data should be overwritten + write_options_single_file_output: Controls if all partitions should be coalesced into a single output file. Generally will have slower performance when set to true. + write_options_partition_by: Sets which columns should be used for hive-style partitioned writes by name. """ - self.df.write_json(str(path)) + self.df.write_json(str(path), write_options_overwrite, write_options_single_file_output, write_options_partition_by) def to_arrow_table(self) -> pa.Table: """Execute the :py:class:`DataFrame` and convert it into an Arrow Table. diff --git a/src/dataframe.rs b/src/dataframe.rs index 3fb8b229..f39b4d01 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -402,7 +402,22 @@ impl PyDataFrame { } /// Write a `DataFrame` to a CSV file. - fn write_csv(&self, path: &str, with_header: bool, py: Python) -> PyResult<()> { + #[pyo3(signature = ( + path, + with_header=false, + overwrite=false, + single_file_output=false, + partition_by=vec![], + ))] + fn write_csv( + &self, + path: &str, + with_header: bool, + overwrite: bool, + single_file_output: bool, + partition_by: Vec, + py: Python + ) -> PyResult<()> { let csv_options = CsvOptions { has_header: Some(with_header), ..Default::default() @@ -411,7 +426,10 @@ impl PyDataFrame { py, self.df.as_ref().clone().write_csv( path, - DataFrameWriteOptions::new(), + DataFrameWriteOptions::default() + .with_overwrite(overwrite) + .with_single_file_output(single_file_output) + .with_partition_by(partition_by), Some(csv_options), ), )?; @@ -422,13 +440,19 @@ impl PyDataFrame { #[pyo3(signature = ( path, compression="uncompressed", - compression_level=None + compression_level=None, + overwrite=false, + single_file_output=false, + partition_by=vec![], ))] fn write_parquet( &self, path: &str, compression: &str, compression_level: Option, + overwrite: bool, + single_file_output: bool, + partition_by: Vec, py: Python, ) -> PyResult<()> { fn verify_compression_level(cl: Option) -> Result { @@ -472,7 +496,10 @@ impl PyDataFrame { py, self.df.as_ref().clone().write_parquet( path, - DataFrameWriteOptions::new(), + DataFrameWriteOptions::default() + .with_overwrite(overwrite) + .with_single_file_output(single_file_output) + .with_partition_by(partition_by), Option::from(options), ), )?; @@ -480,13 +507,32 @@ impl PyDataFrame { } /// Executes a query and writes the results to a partitioned JSON file. - fn write_json(&self, path: &str, py: Python) -> PyResult<()> { + #[pyo3(signature = ( + path, + overwrite=false, + single_file_output=false, + partition_by=vec![], + ))] + fn write_json( + &self, + path: &str, + overwrite: bool, + single_file_output: bool, + partition_by: Vec, + py: Python + ) -> PyResult<()> { wait_for_future( py, self.df .as_ref() .clone() - .write_json(path, DataFrameWriteOptions::new(), None), + .write_json( + path, + DataFrameWriteOptions::default() + .with_overwrite(overwrite) + .with_single_file_output(single_file_output) + .with_partition_by(partition_by), + None), )?; Ok(()) }