Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1661,6 +1661,7 @@ config_field!(bool, value => default_config_transform(value.to_lowercase().as_st
config_field!(usize);
config_field!(f64);
config_field!(u64);
config_field!(u32);

impl ConfigField for u8 {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
Expand Down Expand Up @@ -2786,6 +2787,14 @@ config_namespace! {
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub newlines_in_values: Option<bool>, default = None
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
/// Compression level for the output file. The valid range depends on the
/// compression algorithm:
/// - ZSTD: 1 to 22 (default: 3)
/// - GZIP: 0 to 10 (default: varies by implementation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does varies by implementation mean here? Depends on system library, depends on rust crate dependency (in which case ideally we'd know which it is)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch @Jefffrey ! You're right that "varies by implementation" is vague. Let me clarify:

The GZIP compression in async-compression uses flate2 under the hood. Looking at the flate2 source code, the default is level 6:

// From https://github.com/rust-lang/flate2-rs/blob/main/src/lib.rs#L220-L224
impl Default for Compression {
    fn default() -> Compression {
        Compression(6)
    }
}

This is the standard zlib/gzip default (going back to the original zlib implementation). The valid range is 0-9 (not 0-10 as I incorrectly wrote).

I'll update the comment to be more precise:

/// - GZIP: 0 to 9 (default: 6)

The reason I was initially cautious is that some compression libraries allow you to swap backends (e.g., flate2 can use miniz_oxide, zlib-rs, or native zlib), but they all follow the same 0-9 range and default to 6 for compatibility.

Would you like me to also update the code to fix the comment and rerun CI workflows?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should specify the default if known; having it left as implementation specific is very confusing to users

/// - BZIP2: 0 to 9 (default: 6)
/// - XZ: 0 to 9 (default: 6)
/// If not specified, the default level for the compression algorithm is used.
pub compression_level: Option<u32>, default = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include level inside compression type CompressionTypeVariant, like

pub enum CompressionTypeVariant {
    /// Gzip-ed file, level 1–9
    Gzip { level: u32 },
    ....

This introduces some API changes, but I think it's cleaner and better for the long term 🤔

pub schema_infer_max_rec: Option<usize>, default = None
pub date_format: Option<String>, default = None
pub datetime_format: Option<String>, default = None
Expand Down Expand Up @@ -2908,6 +2917,14 @@ impl CsvOptions {
self
}

/// Set the compression level for the output file.
/// The valid range depends on the compression algorithm.
/// If not specified, the default level for the algorithm is used.
pub fn with_compression_level(mut self, level: u32) -> Self {
self.compression_level = Some(level);
self
}

/// The delimiter character.
pub fn delimiter(&self) -> u8 {
self.delimiter
Expand All @@ -2933,6 +2950,14 @@ config_namespace! {
/// Options controlling JSON format
pub struct JsonOptions {
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
/// Compression level for the output file. The valid range depends on the
/// compression algorithm:
/// - ZSTD: 1 to 22 (default: 3)
/// - GZIP: 0 to 10 (default: varies by implementation)
/// - BZIP2: 0 to 9 (default: 6)
/// - XZ: 0 to 9 (default: 6)
/// If not specified, the default level for the compression algorithm is used.
pub compression_level: Option<u32>, default = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realize that there is no impl JsonOptions with all with_xyz(mut self, ...) setters like the CsvOptions.

pub schema_infer_max_rec: Option<usize>, default = None
}
}
Expand Down
17 changes: 17 additions & 0 deletions datafusion/common/src/file_options/csv_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct CsvWriterOptions {
/// Compression to apply after ArrowWriter serializes RecordBatches.
/// This compression is applied by DataFusion not the ArrowWriter itself.
pub compression: CompressionTypeVariant,
/// Compression level for the output file.
pub compression_level: Option<u32>,
}

impl CsvWriterOptions {
Expand All @@ -41,6 +43,20 @@ impl CsvWriterOptions {
Self {
writer_options,
compression,
compression_level: None,
}
}

/// Create a new `CsvWriterOptions` with the specified compression level.
pub fn new_with_level(
writer_options: WriterBuilder,
compression: CompressionTypeVariant,
compression_level: Option<u32>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to just have compression_level: u32 and direct users to use new if they want default (None) compression level? Thoughts? 🤔

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at flate2::Compression, it uses new(level: u32) + default() rather than Option.

My rationale was config system integration. CsvOptions.compression_level is Option<u32> because users may or may not specify it in config. The new_with_level(..., Option<u32>) signature makes the TryFrom<&CsvOptions> impl straightforward.

But I agree the public API could be cleaner. I could:

  1. Keep new_with_level(..., compression_level: u32) as you suggest (non-optional)
  2. Let TryFrom internally call new() when compression_level is None, or new_with_level() when Some

Would you prefer that approach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this response LLM generated? It doesn't make sense with regards to the codebase; TryFrom<&CsvOptions> doesn't use new() or new_with_level()

) -> Self {
Self {
writer_options,
compression,
compression_level,
}
}
}
Expand Down Expand Up @@ -81,6 +97,7 @@ impl TryFrom<&CsvOptions> for CsvWriterOptions {
Ok(CsvWriterOptions {
writer_options: builder,
compression: value.compression,
compression_level: value.compression_level,
})
}
}
18 changes: 17 additions & 1 deletion datafusion/common/src/file_options/json_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,26 @@ use crate::{
#[derive(Clone, Debug)]
pub struct JsonWriterOptions {
pub compression: CompressionTypeVariant,
pub compression_level: Option<u32>,
}

impl JsonWriterOptions {
pub fn new(compression: CompressionTypeVariant) -> Self {
Self { compression }
Self {
compression,
compression_level: None,
}
}

/// Create a new `JsonWriterOptions` with the specified compression and level.
pub fn new_with_level(
compression: CompressionTypeVariant,
compression_level: Option<u32>,
) -> Self {
Self {
compression,
compression_level,
}
}
}

Expand All @@ -41,6 +56,7 @@ impl TryFrom<&JsonOptions> for JsonWriterOptions {
fn try_from(value: &JsonOptions) -> Result<Self> {
Ok(JsonWriterOptions {
compression: value.compression,
compression_level: value.compression_level,
})
}
}
1 change: 1 addition & 0 deletions datafusion/datasource-csv/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,7 @@ impl FileSink for CsvSink {
context,
serializer,
self.writer_options.compression.into(),
self.writer_options.compression_level,
object_store,
demux_task,
file_stream_rx,
Expand Down
1 change: 1 addition & 0 deletions datafusion/datasource-json/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ impl FileSink for JsonSink {
context,
serializer,
self.writer_options.compression.into(),
self.writer_options.compression_level,
object_store,
demux_task,
file_stream_rx,
Expand Down
50 changes: 44 additions & 6 deletions datafusion/datasource/src/file_compression_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,63 @@ impl FileCompressionType {
}

/// Wrap the given `BufWriter` so that it performs compressed writes
/// according to this `FileCompressionType`.
/// according to this `FileCompressionType` using the default compression level.
pub fn convert_async_writer(
&self,
w: BufWriter,
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
self.convert_async_writer_with_level(w, None)
}

/// Wrap the given `BufWriter` so that it performs compressed writes
/// according to this `FileCompressionType`.
///
/// If `compression_level` is `Some`, the encoder will use the specified
/// compression level. If `None`, the default level for each algorithm is used.
pub fn convert_async_writer_with_level(
&self,
w: BufWriter,
compression_level: Option<u32>,
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
#[cfg(feature = "compression")]
use async_compression::Level;

Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => Box::new(GzipEncoder::new(w)),
GZIP => match compression_level {
Some(level) => {
Box::new(GzipEncoder::with_quality(w, Level::Precise(level as i32)))
}
None => Box::new(GzipEncoder::new(w)),
},
#[cfg(feature = "compression")]
BZIP2 => Box::new(BzEncoder::new(w)),
BZIP2 => match compression_level {
Some(level) => {
Box::new(BzEncoder::with_quality(w, Level::Precise(level as i32)))
}
None => Box::new(BzEncoder::new(w)),
},
#[cfg(feature = "compression")]
XZ => Box::new(XzEncoder::new(w)),
XZ => match compression_level {
Some(level) => {
Box::new(XzEncoder::with_quality(w, Level::Precise(level as i32)))
}
None => Box::new(XzEncoder::new(w)),
},
#[cfg(feature = "compression")]
ZSTD => Box::new(ZstdEncoder::new(w)),
ZSTD => match compression_level {
Some(level) => {
Box::new(ZstdEncoder::with_quality(w, Level::Precise(level as i32)))
}
None => Box::new(ZstdEncoder::new(w)),
},
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
// compression_level is not used when compression feature is disabled
let _ = compression_level;
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
));
}
UNCOMPRESSED => Box::new(w),
})
Expand Down
23 changes: 22 additions & 1 deletion datafusion/datasource/src/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ pub struct ObjectWriterBuilder {
object_store: Arc<dyn ObjectStore>,
/// The size of the buffer for the object writer.
buffer_size: Option<usize>,
/// The compression level for the object writer.
compression_level: Option<u32>,
}

impl ObjectWriterBuilder {
Expand All @@ -145,6 +147,7 @@ impl ObjectWriterBuilder {
location: location.clone(),
object_store,
buffer_size: None,
compression_level: None,
}
}

Expand Down Expand Up @@ -202,6 +205,22 @@ impl ObjectWriterBuilder {
self.buffer_size
}

/// Set compression level for object writer.
pub fn set_compression_level(&mut self, compression_level: Option<u32>) {
self.compression_level = compression_level;
}

/// Set compression level for object writer, returning the builder.
pub fn with_compression_level(mut self, compression_level: Option<u32>) -> Self {
self.compression_level = compression_level;
self
}

/// Currently specified compression level.
pub fn get_compression_level(&self) -> Option<u32> {
self.compression_level
}

/// Return a writer object that writes to the object store location.
///
/// If a buffer size has not been set, the default buffer buffer size will
Expand All @@ -215,13 +234,15 @@ impl ObjectWriterBuilder {
location,
object_store,
buffer_size,
compression_level,
} = self;

let buf_writer = match buffer_size {
Some(size) => BufWriter::with_capacity(object_store, location, size),
None => BufWriter::new(object_store, location),
};

file_compression_type.convert_async_writer(buf_writer)
file_compression_type
.convert_async_writer_with_level(buf_writer, compression_level)
}
}
2 changes: 2 additions & 0 deletions datafusion/datasource/src/write/orchestration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ pub async fn spawn_writer_tasks_and_join(
context: &Arc<TaskContext>,
serializer: Arc<dyn BatchSerializer>,
compression: FileCompressionType,
compression_level: Option<u32>,
object_store: Arc<dyn ObjectStore>,
demux_task: SpawnedTask<Result<()>>,
mut file_stream_rx: DemuxedStreamReceiver,
Expand All @@ -265,6 +266,7 @@ pub async fn spawn_writer_tasks_and_join(
.execution
.objectstore_writer_buffer_size,
))
.with_compression_level(compression_level)
.build()?;

if tx_file_bundle
Expand Down
7 changes: 3 additions & 4 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ use crate::udf_eq::UdfEq;
use crate::{ColumnarValue, Documentation, Expr, Signature};
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{
assert_or_internal_err, not_impl_err, DataFusionError, ExprSchema, Result,
ScalarValue,
};
#[cfg(debug_assertions)]
use datafusion_common::{assert_or_internal_err, DataFusionError};
use datafusion_common::{not_impl_err, ExprSchema, Result, ScalarValue};
use datafusion_expr_common::dyn_eq::{DynEq, DynHash};
use datafusion_expr_common::interval_arithmetic::Interval;
use std::any::Any;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -461,12 +461,14 @@ message CsvOptions {
bytes newlines_in_values = 16; // Indicates if newlines are supported in values
bytes terminator = 17; // Optional terminator character as a byte
bytes truncated_rows = 18; // Indicates if truncated rows are allowed
optional uint32 compression_level = 19; // Optional compression level
}

// Options controlling CSV format
message JsonOptions {
CompressionTypeVariant compression = 1; // Compression type
optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference
optional uint32 compression_level = 3; // Optional compression level
}

message TableParquetOptions {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
double_quote: proto_opts.has_header.first().map(|h| *h != 0),
newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
compression: proto_opts.compression().into(),
compression_level: proto_opts.compression_level,
schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
date_format: (!proto_opts.date_format.is_empty())
.then(|| proto_opts.date_format.clone()),
Expand Down Expand Up @@ -1091,6 +1092,7 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
let compression: protobuf::CompressionTypeVariant = proto_opts.compression();
Ok(JsonOptions {
compression: compression.into(),
compression_level: proto_opts.compression_level,
schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),
})
}
Expand Down
Loading