Skip to content

Commit f2f9b00

Browse files
committed
refactor: return to the WriterPropertiesBuilder::TryFrom<TableParquetOptions>, and separately add the arrow_schema to the kv_metadata on the TableParquetOptions
1 parent 30448b9 commit f2f9b00

File tree

4 files changed

+77
-72
lines changed

4 files changed

+77
-72
lines changed

datafusion/common/src/config.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1502,6 +1502,20 @@ impl TableParquetOptions {
15021502
pub fn new() -> Self {
15031503
Self::default()
15041504
}
1505+
1506+
/// Set whether the encoding of the arrow metadata should occur
1507+
/// during the writing of parquet.
1508+
///
1509+
/// Default is to encode the arrow schema in the file kv_metadata.
1510+
pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
1511+
Self {
1512+
global: ParquetOptions {
1513+
skip_arrow_metadata: skip,
1514+
..self.global
1515+
},
1516+
..self
1517+
}
1518+
}
15051519
}
15061520

15071521
impl ConfigField for TableParquetOptions {

datafusion/common/src/file_options/mod.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ mod tests {
3939

4040
use parquet::{
4141
basic::{Compression, Encoding, ZstdLevel},
42-
file::properties::{EnabledStatistics, WriterVersion},
42+
file::properties::{EnabledStatistics, WriterPropertiesBuilder, WriterVersion},
4343
schema::types::ColumnPath,
4444
};
4545

@@ -78,10 +78,8 @@ mod tests {
7878
table_config.set_config_format(ConfigFileType::PARQUET);
7979
table_config.alter_with_string_hash_map(&option_map)?;
8080

81-
let properties = table_config
82-
.parquet
83-
.into_writer_properties_builder()?
84-
.build();
81+
let properties =
82+
WriterPropertiesBuilder::try_from(&table_config.parquet)?.build();
8583

8684
// Verify the expected options propagated down to parquet crate WriterProperties struct
8785
assert_eq!(properties.max_row_group_size(), 123);
@@ -185,10 +183,8 @@ mod tests {
185183
table_config.set_config_format(ConfigFileType::PARQUET);
186184
table_config.alter_with_string_hash_map(&option_map)?;
187185

188-
let properties = table_config
189-
.parquet
190-
.into_writer_properties_builder()?
191-
.build();
186+
let properties =
187+
WriterPropertiesBuilder::try_from(&table_config.parquet)?.build();
192188

193189
let col1 = ColumnPath::from(vec!["col1".to_owned()]);
194190
let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]);

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 42 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -59,52 +59,49 @@ impl ParquetWriterOptions {
5959
}
6060

6161
impl TableParquetOptions {
62-
#[deprecated(
63-
since = "44.0.0",
64-
note = "Please use `TableParquetOptions::into_writer_properties_builder` and `TableParquetOptions::into_writer_properties_builder_with_arrow_schema`"
65-
)]
66-
pub fn try_from(table_opts: &TableParquetOptions) -> Result<ParquetWriterOptions> {
62+
/// Add the arrow schema to the parquet kv_metadata.
63+
/// If already exists, then overwrites.
64+
pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
65+
self.key_value_metadata.insert(
66+
ARROW_SCHEMA_META_KEY.into(),
67+
Some(encode_arrow_schema(schema)),
68+
);
69+
}
70+
}
71+
72+
impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
73+
type Error = DataFusionError;
74+
75+
fn try_from(parquet_table_options: &TableParquetOptions) -> Result<Self> {
6776
// ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns)
6877
Ok(ParquetWriterOptions {
69-
writer_options: table_opts.into_writer_properties_builder()?.build(),
78+
writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)?
79+
.build(),
7080
})
7181
}
82+
}
7283

73-
/// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`].
74-
///
75-
/// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column.
76-
pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
77-
self.into_writer_properties_builder_with_arrow_schema(None)
78-
}
84+
impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
85+
type Error = DataFusionError;
7986

8087
/// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`].
8188
///
82-
/// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column,
83-
/// as well as the arrow schema encoded into the kv_meta at [`ARROW_SCHEMA_META_KEY`].
84-
pub fn into_writer_properties_builder_with_arrow_schema(
85-
&self,
86-
to_encode: Option<&Arc<Schema>>,
87-
) -> Result<WriterPropertiesBuilder> {
89+
/// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column.
90+
fn try_from(table_parquet_options: &TableParquetOptions) -> Result<Self> {
8891
// Table options include kv_metadata and col-specific options
8992
let TableParquetOptions {
9093
global,
9194
column_specific_options,
9295
key_value_metadata,
93-
} = self;
96+
} = table_parquet_options;
9497

9598
let mut builder = global.into_writer_properties_builder()?;
9699

97100
// add kv_meta, if any
98-
let mut kv_meta = key_value_metadata.to_owned();
99-
if let Some(schema) = to_encode {
100-
kv_meta.insert(
101-
ARROW_SCHEMA_META_KEY.into(),
102-
Some(encode_arrow_schema(schema)),
103-
);
104-
}
105-
if !kv_meta.is_empty() {
101+
if !key_value_metadata.is_empty() {
106102
builder = builder.set_key_value_metadata(Some(
107-
kv_meta
103+
key_value_metadata
104+
.to_owned()
108105
.drain()
109106
.map(|(key, value)| KeyValue { key, value })
110107
.collect(),
@@ -621,8 +618,7 @@ mod tests {
621618
key_value_metadata: [(key, value)].into(),
622619
};
623620

624-
let writer_props = table_parquet_opts
625-
.into_writer_properties_builder()
621+
let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts)
626622
.unwrap()
627623
.build();
628624
assert_eq!(
@@ -649,10 +645,10 @@ mod tests {
649645
let default_writer_props = WriterProperties::new();
650646

651647
// WriterProperties::try_from(TableParquetOptions::default), a.k.a. using datafusion's defaults
652-
let from_datafusion_defaults = default_table_writer_opts
653-
.into_writer_properties_builder()
654-
.unwrap()
655-
.build();
648+
let from_datafusion_defaults =
649+
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
650+
.unwrap()
651+
.build();
656652

657653
// Expected: how the defaults should not match
658654
assert_ne!(
@@ -705,10 +701,10 @@ mod tests {
705701
// the TableParquetOptions::default, with only the bloom filter turned on
706702
let mut default_table_writer_opts = TableParquetOptions::default();
707703
default_table_writer_opts.global.bloom_filter_on_write = true;
708-
let from_datafusion_defaults = default_table_writer_opts
709-
.into_writer_properties_builder()
710-
.unwrap()
711-
.build();
704+
let from_datafusion_defaults =
705+
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
706+
.unwrap()
707+
.build();
712708

713709
// the WriterProperties::default, with only the bloom filter turned on
714710
let default_writer_props = WriterProperties::builder()
@@ -733,10 +729,10 @@ mod tests {
733729
let mut default_table_writer_opts = TableParquetOptions::default();
734730
default_table_writer_opts.global.bloom_filter_on_write = true;
735731
default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
736-
let from_datafusion_defaults = default_table_writer_opts
737-
.into_writer_properties_builder()
738-
.unwrap()
739-
.build();
732+
let from_datafusion_defaults =
733+
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
734+
.unwrap()
735+
.build();
740736

741737
// the WriterProperties::default, with only fpp set
742738
let default_writer_props = WriterProperties::builder()
@@ -765,10 +761,10 @@ mod tests {
765761
let mut default_table_writer_opts = TableParquetOptions::default();
766762
default_table_writer_opts.global.bloom_filter_on_write = true;
767763
default_table_writer_opts.global.bloom_filter_ndv = Some(42);
768-
let from_datafusion_defaults = default_table_writer_opts
769-
.into_writer_properties_builder()
770-
.unwrap()
771-
.build();
764+
let from_datafusion_defaults =
765+
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
766+
.unwrap()
767+
.build();
772768

773769
// the WriterProperties::default, with only ndv set
774770
let default_writer_props = WriterProperties::builder()

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ use parquet::arrow::{
7373
arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter,
7474
};
7575
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
76-
use parquet::file::properties::WriterProperties;
76+
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
7777
use parquet::file::writer::SerializedFileWriter;
7878
use parquet::format::FileMetaData;
7979
use tokio::io::{AsyncWrite, AsyncWriteExt};
@@ -752,24 +752,23 @@ impl ParquetSink {
752752
/// Create writer properties based upon configuration settings,
753753
/// including partitioning and the inclusion of arrow schema metadata.
754754
fn create_writer_props(&self) -> Result<WriterProperties> {
755-
let props = if !self.parquet_options.global.skip_arrow_metadata {
756-
let schema = if self.parquet_options.global.allow_single_file_parallelism {
757-
// If parallelizing writes, we may be also be doing hive style partitioning
758-
// into multiple files which impacts the schema per file.
759-
// Refer to `self.get_writer_schema()`
760-
&self.get_writer_schema()
761-
} else {
762-
self.config.output_schema()
763-
};
764-
self.parquet_options
765-
.into_writer_properties_builder_with_arrow_schema(Some(schema))?
766-
.build()
755+
let schema = if self.parquet_options.global.allow_single_file_parallelism {
756+
// If parallelizing writes, we may be also be doing hive style partitioning
757+
// into multiple files which impacts the schema per file.
758+
// Refer to `self.get_writer_schema()`
759+
&self.get_writer_schema()
767760
} else {
768-
self.parquet_options
769-
.into_writer_properties_builder()?
770-
.build()
761+
self.config.output_schema()
771762
};
772-
Ok(props)
763+
764+
// TODO: avoid this clone in follow up PR, where the writer properties & schema
765+
// are calculated once on `ParquetSink::new`
766+
let mut parquet_opts = self.parquet_options.clone();
767+
if !self.parquet_options.global.skip_arrow_metadata {
768+
parquet_opts.arrow_schema(schema);
769+
}
770+
771+
Ok(WriterPropertiesBuilder::try_from(&parquet_opts)?.build())
773772
}
774773

775774
/// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore

0 commit comments

Comments
 (0)