Skip to content

Commit b54e648

Browse files
authored
Supporting writing schema metadata when writing Parquet in parallel (#13866)
* refactor: make ParquetSink tests a bit more readable * chore(11770): add new ParquetOptions.skip_arrow_metadata * test(11770): demonstrate that the single threaded ParquetSink is already writing the arrow schema in the kv_meta, and allow disablement * refactor(11770): replace with new method, since the kv_metadata is inherent to TableParquetOptions and therefore we should explicitly make the API apparant that you have to include the arrow schema or not * fix(11770): fix parallel ParquetSink to encode arrow schema into the file metadata, based on the ParquetOptions * refactor(11770): provide deprecation warning for TryFrom * test(11770): update tests with new default to include arrow schema * refactor: including partitioning of arrow schema inserted into kv_metdata * test: update tests for new config prop, as well as the new file partition offsets based upon larger metadata * chore: avoid cloning in tests, and update code docs * refactor: return to the WriterPropertiesBuilder::TryFrom<TableParquetOptions>, and separately add the arrow_schema to the kv_metadata on the TableParquetOptions * refactor: require the arrow_schema key to be present in the kv_metadata, if is required by the configuration * chore: update configs.md * test: update tests to handle the (default) required arrow schema in the kv_metadata * chore: add reference to arrow-rs upstream PR
1 parent aafec07 commit b54e648

File tree

16 files changed

+380
-91
lines changed

16 files changed

+380
-91
lines changed

datafusion-cli/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ apache-avro = { version = "0.17", default-features = false, features = [
5353
arrow = { workspace = true }
5454
arrow-array = { workspace = true }
5555
arrow-buffer = { workspace = true }
56+
arrow-ipc = { workspace = true }
5657
arrow-schema = { workspace = true }
58+
base64 = "0.22.1"
5759
half = { workspace = true }
5860
hashbrown = { workspace = true }
5961
indexmap = { workspace = true }

datafusion/common/src/config.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,12 @@ config_namespace! {
436436
/// valid values are "1.0" and "2.0"
437437
pub writer_version: String, default = "1.0".to_string()
438438

439+
/// (writing) Skip encoding the embedded arrow metadata in the KV_meta
440+
///
441+
/// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
442+
/// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
443+
pub skip_arrow_metadata: bool, default = false
444+
439445
/// (writing) Sets default parquet compression codec.
440446
/// Valid values are: uncompressed, snappy, gzip(level),
441447
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
@@ -1496,6 +1502,20 @@ impl TableParquetOptions {
14961502
pub fn new() -> Self {
14971503
Self::default()
14981504
}
1505+
1506+
/// Set whether the encoding of the arrow metadata should occur
1507+
/// during the writing of parquet.
1508+
///
1509+
/// Default is to encode the arrow schema in the file kv_metadata.
1510+
pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
1511+
Self {
1512+
global: ParquetOptions {
1513+
skip_arrow_metadata: skip,
1514+
..self.global
1515+
},
1516+
..self
1517+
}
1518+
}
14991519
}
15001520

15011521
impl ConfigField for TableParquetOptions {

datafusion/common/src/file_options/mod.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ pub mod parquet_writer;
3030
mod tests {
3131
use std::collections::HashMap;
3232

33-
use super::parquet_writer::ParquetWriterOptions;
3433
use crate::{
3534
config::{ConfigFileType, TableOptions},
3635
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
@@ -40,7 +39,7 @@ mod tests {
4039

4140
use parquet::{
4241
basic::{Compression, Encoding, ZstdLevel},
43-
file::properties::{EnabledStatistics, WriterVersion},
42+
file::properties::{EnabledStatistics, WriterPropertiesBuilder, WriterVersion},
4443
schema::types::ColumnPath,
4544
};
4645

@@ -79,8 +78,10 @@ mod tests {
7978
table_config.set_config_format(ConfigFileType::PARQUET);
8079
table_config.alter_with_string_hash_map(&option_map)?;
8180

82-
let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
83-
let properties = parquet_options.writer_options();
81+
let properties = WriterPropertiesBuilder::try_from(
82+
&table_config.parquet.with_skip_arrow_metadata(true),
83+
)?
84+
.build();
8485

8586
// Verify the expected options propagated down to parquet crate WriterProperties struct
8687
assert_eq!(properties.max_row_group_size(), 123);
@@ -184,8 +185,10 @@ mod tests {
184185
table_config.set_config_format(ConfigFileType::PARQUET);
185186
table_config.alter_with_string_hash_map(&option_map)?;
186187

187-
let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
188-
let properties = parquet_options.writer_options();
188+
let properties = WriterPropertiesBuilder::try_from(
189+
&table_config.parquet.with_skip_arrow_metadata(true),
190+
)?
191+
.build();
189192

190193
let col1 = ColumnPath::from(vec!["col1".to_owned()]);
191194
let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]);

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 106 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,25 @@
1717

1818
//! Options related to how parquet files should be written
1919
20+
use base64::Engine;
21+
use std::sync::Arc;
22+
2023
use crate::{
2124
config::{ParquetOptions, TableParquetOptions},
22-
DataFusionError, Result,
25+
DataFusionError, Result, _internal_datafusion_err,
2326
};
2427

28+
use arrow_schema::Schema;
2529
use parquet::{
30+
arrow::ARROW_SCHEMA_META_KEY,
2631
basic::{BrotliLevel, GzipLevel, ZstdLevel},
27-
file::properties::{
28-
EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
29-
DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
32+
file::{
33+
metadata::KeyValue,
34+
properties::{
35+
EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
36+
DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
37+
},
3038
},
31-
format::KeyValue,
3239
schema::types::ColumnPath,
3340
};
3441

@@ -51,6 +58,17 @@ impl ParquetWriterOptions {
5158
}
5259
}
5360

61+
impl TableParquetOptions {
62+
/// Add the arrow schema to the parquet kv_metadata.
63+
/// If already exists, then overwrites.
64+
pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
65+
self.key_value_metadata.insert(
66+
ARROW_SCHEMA_META_KEY.into(),
67+
Some(encode_arrow_schema(schema)),
68+
);
69+
}
70+
}
71+
5472
impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
5573
type Error = DataFusionError;
5674

@@ -79,6 +97,14 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
7997

8098
let mut builder = global.into_writer_properties_builder()?;
8199

100+
// check that the arrow schema is present in the kv_metadata, if configured to do so
101+
if !global.skip_arrow_metadata
102+
&& !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
103+
{
104+
return Err(_internal_datafusion_err!("arrow schema was not added to the kv_metadata, even though it is required by configuration settings"));
105+
}
106+
107+
// add kv_meta, if any
82108
if !key_value_metadata.is_empty() {
83109
builder = builder.set_key_value_metadata(Some(
84110
key_value_metadata
@@ -140,11 +166,38 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
140166
}
141167
}
142168

169+
/// Encodes the Arrow schema into the IPC format, and base64 encodes it
170+
///
171+
/// TODO: use extern parquet's private method, once publicly available.
172+
/// Refer to <https://github.com/apache/arrow-rs/pull/6916>
173+
fn encode_arrow_schema(schema: &Arc<Schema>) -> String {
174+
let options = arrow_ipc::writer::IpcWriteOptions::default();
175+
let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(true);
176+
let data_gen = arrow_ipc::writer::IpcDataGenerator::default();
177+
let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker(
178+
schema,
179+
&mut dictionary_tracker,
180+
&options,
181+
);
182+
183+
// manually prepending the length to the schema as arrow uses the legacy IPC format
184+
// TODO: change after addressing ARROW-9777
185+
let schema_len = serialized_schema.ipc_message.len();
186+
let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
187+
len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
188+
len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
189+
len_prefix_schema.append(&mut serialized_schema.ipc_message);
190+
191+
base64::prelude::BASE64_STANDARD.encode(&len_prefix_schema)
192+
}
193+
143194
impl ParquetOptions {
144195
/// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`].
145196
///
146197
/// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options
147198
/// applied per column; a customization which is not applicable for [`ParquetOptions`].
199+
///
200+
/// Note that this method does not include the key_value_metadata from [`TableParquetOptions`].
148201
pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
149202
let ParquetOptions {
150203
data_pagesize_limit,
@@ -177,6 +230,7 @@ impl ParquetOptions {
177230
bloom_filter_on_read: _, // reads not used for writer props
178231
schema_force_view_types: _,
179232
binary_as_string: _, // not used for writer props
233+
skip_arrow_metadata: _,
180234
} = self;
181235

182236
let mut builder = WriterProperties::builder()
@@ -444,6 +498,7 @@ mod tests {
444498
bloom_filter_on_read: defaults.bloom_filter_on_read,
445499
schema_force_view_types: defaults.schema_force_view_types,
446500
binary_as_string: defaults.binary_as_string,
501+
skip_arrow_metadata: defaults.skip_arrow_metadata,
447502
}
448503
}
449504

@@ -546,19 +601,55 @@ mod tests {
546601
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
547602
schema_force_view_types: global_options_defaults.schema_force_view_types,
548603
binary_as_string: global_options_defaults.binary_as_string,
604+
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
549605
},
550606
column_specific_options,
551607
key_value_metadata,
552608
}
553609
}
554610

611+
#[test]
612+
fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
613+
// TableParquetOptions, all props set to default
614+
let mut table_parquet_opts = TableParquetOptions::default();
615+
assert!(
616+
!table_parquet_opts.global.skip_arrow_metadata,
617+
"default false, to not skip the arrow schema requirement"
618+
);
619+
620+
// see errors without the schema added, using default settings
621+
let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
622+
assert!(
623+
should_error.is_err(),
624+
"should error without the required arrow schema in kv_metadata",
625+
);
626+
627+
// succeeds if we permit skipping the arrow schema
628+
table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
629+
let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
630+
assert!(
631+
should_succeed.is_ok(),
632+
"should work with the arrow schema skipped by config",
633+
);
634+
635+
// Set the arrow schema back to required
636+
table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
637+
// add the arrow schema to the kv_meta
638+
table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
639+
let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
640+
assert!(
641+
should_succeed.is_ok(),
642+
"should work with the arrow schema included in TableParquetOptions",
643+
);
644+
}
645+
555646
#[test]
556647
fn table_parquet_opts_to_writer_props() {
557648
// ParquetOptions, all props set to non-default
558649
let parquet_options = parquet_options_with_non_defaults();
559650

560651
// TableParquetOptions, using ParquetOptions for global settings
561-
let key = "foo".to_string();
652+
let key = ARROW_SCHEMA_META_KEY.to_string();
562653
let value = Some("bar".into());
563654
let table_parquet_opts = TableParquetOptions {
564655
global: parquet_options.clone(),
@@ -585,14 +676,18 @@ mod tests {
585676
#[test]
586677
fn test_defaults_match() {
587678
// ensure the global settings are the same
588-
let default_table_writer_opts = TableParquetOptions::default();
679+
let mut default_table_writer_opts = TableParquetOptions::default();
589680
let default_parquet_opts = ParquetOptions::default();
590681
assert_eq!(
591682
default_table_writer_opts.global,
592683
default_parquet_opts,
593684
"should have matching defaults for TableParquetOptions.global and ParquetOptions",
594685
);
595686

687+
// selectively skip the arrow_schema metadata, since the WriterProperties default has an empty kv_meta (no arrow schema)
688+
default_table_writer_opts =
689+
default_table_writer_opts.with_skip_arrow_metadata(true);
690+
596691
// WriterProperties::default, a.k.a. using extern parquet's defaults
597692
let default_writer_props = WriterProperties::new();
598693

@@ -640,6 +735,7 @@ mod tests {
640735
session_config_from_writer_props(&default_writer_props);
641736
from_extern_parquet.global.created_by = same_created_by;
642737
from_extern_parquet.global.compression = Some("zstd(3)".into());
738+
from_extern_parquet.global.skip_arrow_metadata = true;
643739

644740
assert_eq!(
645741
default_table_writer_opts,
@@ -653,6 +749,7 @@ mod tests {
653749
// the TableParquetOptions::default, with only the bloom filter turned on
654750
let mut default_table_writer_opts = TableParquetOptions::default();
655751
default_table_writer_opts.global.bloom_filter_on_write = true;
752+
default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
656753
let from_datafusion_defaults =
657754
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
658755
.unwrap()
@@ -681,6 +778,7 @@ mod tests {
681778
let mut default_table_writer_opts = TableParquetOptions::default();
682779
default_table_writer_opts.global.bloom_filter_on_write = true;
683780
default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
781+
default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
684782
let from_datafusion_defaults =
685783
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
686784
.unwrap()
@@ -713,6 +811,7 @@ mod tests {
713811
let mut default_table_writer_opts = TableParquetOptions::default();
714812
default_table_writer_opts.global.bloom_filter_on_write = true;
715813
default_table_writer_opts.global.bloom_filter_ndv = Some(42);
814+
default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
716815
let from_datafusion_defaults =
717816
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
718817
.unwrap()

0 commit comments

Comments
 (0)