Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ apache-avro = { version = "0.17", default-features = false, features = [
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }
base64 = "0.22.1"
half = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
Expand Down
20 changes: 20 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,12 @@ config_namespace! {
/// valid values are "1.0" and "2.0"
pub writer_version: String, default = "1.0".to_string()

/// (writing) Skip encoding the embedded arrow metadata in the KV_meta
///
/// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
/// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
pub skip_arrow_metadata: bool, default = false

/// (writing) Sets default parquet compression codec.
/// Valid values are: uncompressed, snappy, gzip(level),
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
Expand Down Expand Up @@ -1496,6 +1502,20 @@ impl TableParquetOptions {
pub fn new() -> Self {
Self::default()
}

/// Set whether the encoding of the arrow metadata should occur
/// during the writing of parquet.
///
/// Default is to encode the arrow schema in the file kv_metadata.
pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
Self {
global: ParquetOptions {
skip_arrow_metadata: skip,
..self.global
},
..self
}
}
}

impl ConfigField for TableParquetOptions {
Expand Down
15 changes: 9 additions & 6 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub mod parquet_writer;
mod tests {
use std::collections::HashMap;

use super::parquet_writer::ParquetWriterOptions;
use crate::{
config::{ConfigFileType, TableOptions},
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
Expand All @@ -40,7 +39,7 @@ mod tests {

use parquet::{
basic::{Compression, Encoding, ZstdLevel},
file::properties::{EnabledStatistics, WriterVersion},
file::properties::{EnabledStatistics, WriterPropertiesBuilder, WriterVersion},
schema::types::ColumnPath,
};

Expand Down Expand Up @@ -79,8 +78,10 @@ mod tests {
table_config.set_config_format(ConfigFileType::PARQUET);
table_config.alter_with_string_hash_map(&option_map)?;

let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
let properties = parquet_options.writer_options();
let properties = WriterPropertiesBuilder::try_from(
&table_config.parquet.with_skip_arrow_metadata(true),
)?
.build();

// Verify the expected options propagated down to parquet crate WriterProperties struct
assert_eq!(properties.max_row_group_size(), 123);
Expand Down Expand Up @@ -184,8 +185,10 @@ mod tests {
table_config.set_config_format(ConfigFileType::PARQUET);
table_config.alter_with_string_hash_map(&option_map)?;

let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?;
let properties = parquet_options.writer_options();
let properties = WriterPropertiesBuilder::try_from(
&table_config.parquet.with_skip_arrow_metadata(true),
)?
.build();

let col1 = ColumnPath::from(vec!["col1".to_owned()]);
let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]);
Expand Down
113 changes: 106 additions & 7 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

//! Options related to how parquet files should be written

use base64::Engine;
use std::sync::Arc;

use crate::{
config::{ParquetOptions, TableParquetOptions},
DataFusionError, Result,
DataFusionError, Result, _internal_datafusion_err,
};

use arrow_schema::Schema;
use parquet::{
arrow::ARROW_SCHEMA_META_KEY,
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{
EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
file::{
metadata::KeyValue,
properties::{
EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
},
},
format::KeyValue,
schema::types::ColumnPath,
};

Expand All @@ -51,6 +58,17 @@ impl ParquetWriterOptions {
}
}

impl TableParquetOptions {
/// Add the arrow schema to the parquet kv_metadata.
/// If already exists, then overwrites.
pub fn arrow_schema(&mut self, schema: &Arc<Schema>) {
self.key_value_metadata.insert(
ARROW_SCHEMA_META_KEY.into(),
Some(encode_arrow_schema(schema)),
);
}
}

impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
type Error = DataFusionError;

Expand Down Expand Up @@ -79,6 +97,14 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {

let mut builder = global.into_writer_properties_builder()?;

// check that the arrow schema is present in the kv_metadata, if configured to do so
if !global.skip_arrow_metadata
&& !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY)
{
return Err(_internal_datafusion_err!("arrow schema was not added to the kv_metadata, even though it is required by configuration settings"));
}

// add kv_meta, if any
if !key_value_metadata.is_empty() {
builder = builder.set_key_value_metadata(Some(
key_value_metadata
Expand Down Expand Up @@ -140,11 +166,38 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
}
}

/// Encodes the Arrow schema into the IPC format, and base64 encodes it
///
/// TODO: use extern parquet's private method, once publicly available.
/// Refer to <https://github.com/apache/arrow-rs/pull/6916>
fn encode_arrow_schema(schema: &Arc<Schema>) -> String {
let options = arrow_ipc::writer::IpcWriteOptions::default();
let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(true);
let data_gen = arrow_ipc::writer::IpcDataGenerator::default();
let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker(
schema,
&mut dictionary_tracker,
&options,
);

// manually prepending the length to the schema as arrow uses the legacy IPC format
// TODO: change after addressing ARROW-9777
let schema_len = serialized_schema.ipc_message.len();
let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
len_prefix_schema.append(&mut serialized_schema.ipc_message);

base64::prelude::BASE64_STANDARD.encode(&len_prefix_schema)
}

impl ParquetOptions {
/// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`].
///
/// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options
/// applied per column; a customization which is not applicable for [`ParquetOptions`].
///
/// Note that this method does not include the key_value_metadata from [`TableParquetOptions`].
pub fn into_writer_properties_builder(&self) -> Result<WriterPropertiesBuilder> {
let ParquetOptions {
data_pagesize_limit,
Expand Down Expand Up @@ -177,6 +230,7 @@ impl ParquetOptions {
bloom_filter_on_read: _, // reads not used for writer props
schema_force_view_types: _,
binary_as_string: _, // not used for writer props
skip_arrow_metadata: _,
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -444,6 +498,7 @@ mod tests {
bloom_filter_on_read: defaults.bloom_filter_on_read,
schema_force_view_types: defaults.schema_force_view_types,
binary_as_string: defaults.binary_as_string,
skip_arrow_metadata: defaults.skip_arrow_metadata,
}
}

Expand Down Expand Up @@ -546,19 +601,55 @@ mod tests {
bloom_filter_on_read: global_options_defaults.bloom_filter_on_read,
schema_force_view_types: global_options_defaults.schema_force_view_types,
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
},
column_specific_options,
key_value_metadata,
}
}

#[test]
fn table_parquet_opts_to_writer_props_skip_arrow_metadata() {
// TableParquetOptions, all props set to default
let mut table_parquet_opts = TableParquetOptions::default();
assert!(
!table_parquet_opts.global.skip_arrow_metadata,
"default false, to not skip the arrow schema requirement"
);

// see errors without the schema added, using default settings
let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts);
assert!(
should_error.is_err(),
"should error without the required arrow schema in kv_metadata",
);

// succeeds if we permit skipping the arrow schema
table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true);
let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
assert!(
should_succeed.is_ok(),
"should work with the arrow schema skipped by config",
);

// Set the arrow schema back to required
table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false);
// add the arrow schema to the kv_meta
table_parquet_opts.arrow_schema(&Arc::new(Schema::empty()));
let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts);
assert!(
should_succeed.is_ok(),
"should work with the arrow schema included in TableParquetOptions",
);
}

#[test]
fn table_parquet_opts_to_writer_props() {
// ParquetOptions, all props set to non-default
let parquet_options = parquet_options_with_non_defaults();

// TableParquetOptions, using ParquetOptions for global settings
let key = "foo".to_string();
let key = ARROW_SCHEMA_META_KEY.to_string();
let value = Some("bar".into());
let table_parquet_opts = TableParquetOptions {
global: parquet_options.clone(),
Expand All @@ -585,14 +676,18 @@ mod tests {
#[test]
fn test_defaults_match() {
// ensure the global settings are the same
let default_table_writer_opts = TableParquetOptions::default();
let mut default_table_writer_opts = TableParquetOptions::default();
let default_parquet_opts = ParquetOptions::default();
assert_eq!(
default_table_writer_opts.global,
default_parquet_opts,
"should have matching defaults for TableParquetOptions.global and ParquetOptions",
);

// selectively skip the arrow_schema metadata, since the WriterProperties default has an empty kv_meta (no arrow schema)
default_table_writer_opts =
default_table_writer_opts.with_skip_arrow_metadata(true);

// WriterProperties::default, a.k.a. using extern parquet's defaults
let default_writer_props = WriterProperties::new();

Expand Down Expand Up @@ -640,6 +735,7 @@ mod tests {
session_config_from_writer_props(&default_writer_props);
from_extern_parquet.global.created_by = same_created_by;
from_extern_parquet.global.compression = Some("zstd(3)".into());
from_extern_parquet.global.skip_arrow_metadata = true;

assert_eq!(
default_table_writer_opts,
Expand All @@ -653,6 +749,7 @@ mod tests {
// the TableParquetOptions::default, with only the bloom filter turned on
let mut default_table_writer_opts = TableParquetOptions::default();
default_table_writer_opts.global.bloom_filter_on_write = true;
default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
let from_datafusion_defaults =
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
.unwrap()
Expand Down Expand Up @@ -681,6 +778,7 @@ mod tests {
let mut default_table_writer_opts = TableParquetOptions::default();
default_table_writer_opts.global.bloom_filter_on_write = true;
default_table_writer_opts.global.bloom_filter_fpp = Some(0.42);
default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
let from_datafusion_defaults =
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
.unwrap()
Expand Down Expand Up @@ -713,6 +811,7 @@ mod tests {
let mut default_table_writer_opts = TableParquetOptions::default();
default_table_writer_opts.global.bloom_filter_on_write = true;
default_table_writer_opts.global.bloom_filter_ndv = Some(42);
default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema
let from_datafusion_defaults =
WriterPropertiesBuilder::try_from(&default_table_writer_opts)
.unwrap()
Expand Down
Loading
Loading