Skip to content

Commit 33f8bdc

Browse files
committed
Merge branch 'main' into nested-struct-14757
2 parents 5be2fa7 + 3c4e39a commit 33f8bdc

File tree

13 files changed

+319
-14
lines changed

13 files changed

+319
-14
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ arrow-flight = { version = "55.1.0", features = [
9898
] }
9999
arrow-ipc = { version = "55.0.0", default-features = false, features = [
100100
"lz4",
101+
"zstd",
101102
] }
102103
arrow-ord = { version = "55.0.0", default-features = false }
103104
arrow-schema = { version = "55.0.0", default-features = false }

datafusion/common/src/config.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
//! Runtime configuration, via [`ConfigOptions`]
1919
20+
use arrow_ipc::CompressionType;
21+
2022
use crate::error::_config_err;
2123
use crate::parsers::CompressionTypeVariant;
2224
use crate::utils::get_available_parallelism;
@@ -274,6 +276,61 @@ config_namespace! {
274276
}
275277
}
276278

279+
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
280+
pub enum SpillCompression {
281+
Zstd,
282+
Lz4Frame,
283+
#[default]
284+
Uncompressed,
285+
}
286+
287+
impl FromStr for SpillCompression {
288+
type Err = DataFusionError;
289+
290+
fn from_str(s: &str) -> Result<Self, Self::Err> {
291+
match s.to_ascii_lowercase().as_str() {
292+
"zstd" => Ok(Self::Zstd),
293+
"lz4_frame" => Ok(Self::Lz4Frame),
294+
"uncompressed" | "" => Ok(Self::Uncompressed),
295+
other => Err(DataFusionError::Configuration(format!(
296+
"Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
297+
))),
298+
}
299+
}
300+
}
301+
302+
impl ConfigField for SpillCompression {
303+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
304+
v.some(key, self, description)
305+
}
306+
307+
fn set(&mut self, _: &str, value: &str) -> Result<()> {
308+
*self = SpillCompression::from_str(value)?;
309+
Ok(())
310+
}
311+
}
312+
313+
impl Display for SpillCompression {
314+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315+
let str = match self {
316+
Self::Zstd => "zstd",
317+
Self::Lz4Frame => "lz4_frame",
318+
Self::Uncompressed => "uncompressed",
319+
};
320+
write!(f, "{str}")
321+
}
322+
}
323+
324+
impl From<SpillCompression> for Option<CompressionType> {
325+
fn from(c: SpillCompression) -> Self {
326+
match c {
327+
SpillCompression::Zstd => Some(CompressionType::ZSTD),
328+
SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
329+
SpillCompression::Uncompressed => None,
330+
}
331+
}
332+
}
333+
277334
config_namespace! {
278335
/// Options related to query execution
279336
///
@@ -330,6 +387,16 @@ config_namespace! {
330387
/// the new schema verification step.
331388
pub skip_physical_aggregate_schema_check: bool, default = false
332389

390+
/// Sets the compression codec used when spilling data to disk.
391+
///
392+
/// Since datafusion writes spill files using the Arrow IPC Stream format,
393+
/// only codecs supported by the Arrow IPC Stream Writer are allowed.
394+
/// Valid values are: uncompressed, lz4_frame, zstd.
395+
/// Note: lz4_frame offers faster (de)compression, but typically results in
396+
/// larger spill files. In contrast, zstd achieves
397+
/// higher compression ratios at the cost of slower (de)compression speed.
398+
pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
399+
333400
/// Specifies the reserved memory for each spillable sort operation to
334401
/// facilitate an in-memory merge.
335402
///

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use arrow::compute::SortOptions;
2828
use arrow::datatypes::{Int32Type, SchemaRef};
2929
use arrow_schema::{DataType, Field, Schema};
3030
use datafusion::assert_batches_eq;
31+
use datafusion::config::SpillCompression;
3132
use datafusion::datasource::memory::MemorySourceConfig;
3233
use datafusion::datasource::source::DataSourceExec;
3334
use datafusion::datasource::{MemTable, TableProvider};
@@ -545,10 +546,11 @@ async fn test_external_sort_zero_merge_reservation() {
545546
// Tests for disk limit (`max_temp_directory_size` in `DiskManager`)
546547
// ------------------------------------------------------------------
547548

548-
// Create a new `SessionContext` with speicified disk limit and memory pool limit
549+
// Create a new `SessionContext` with speicified disk limit, memory pool limit, and spill compression codec
549550
async fn setup_context(
550551
disk_limit: u64,
551552
memory_pool_limit: usize,
553+
spill_compression: SpillCompression,
552554
) -> Result<SessionContext> {
553555
let disk_manager = DiskManagerBuilder::default()
554556
.with_mode(DiskManagerMode::OsTmpDirectory)
@@ -570,6 +572,7 @@ async fn setup_context(
570572
let config = SessionConfig::new()
571573
.with_sort_spill_reservation_bytes(64 * 1024) // 256KB
572574
.with_sort_in_place_threshold_bytes(0)
575+
.with_spill_compression(spill_compression)
573576
.with_batch_size(64) // To reduce test memory usage
574577
.with_target_partitions(1);
575578

@@ -580,7 +583,8 @@ async fn setup_context(
580583
/// (specified by `max_temp_directory_size` in `DiskManager`)
581584
#[tokio::test]
582585
async fn test_disk_spill_limit_reached() -> Result<()> {
583-
let ctx = setup_context(1024 * 1024, 1024 * 1024).await?; // 1MB disk limit, 1MB memory limit
586+
let spill_compression = SpillCompression::Uncompressed;
587+
let ctx = setup_context(1024 * 1024, 1024 * 1024, spill_compression).await?; // 1MB disk limit, 1MB memory limit
584588

585589
let df = ctx
586590
.sql("select * from generate_series(1, 1000000000000) as t1(v1) order by v1")
@@ -602,7 +606,8 @@ async fn test_disk_spill_limit_reached() -> Result<()> {
602606
#[tokio::test]
603607
async fn test_disk_spill_limit_not_reached() -> Result<()> {
604608
let disk_spill_limit = 1024 * 1024; // 1MB
605-
let ctx = setup_context(disk_spill_limit, 128 * 1024).await?; // 1MB disk limit, 128KB memory limit
609+
let spill_compression = SpillCompression::Uncompressed;
610+
let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit
606611

607612
let df = ctx
608613
.sql("select * from generate_series(1, 10000) as t1(v1) order by v1")
@@ -630,6 +635,77 @@ async fn test_disk_spill_limit_not_reached() -> Result<()> {
630635
Ok(())
631636
}
632637

638+
/// External query should succeed using zstd as spill compression codec and
639+
/// and all temporary spill files are properly cleaned up after execution.
640+
/// Note: This test does not inspect file contents (e.g. magic number),
641+
/// as spill files are automatically deleted on drop.
642+
#[tokio::test]
643+
async fn test_spill_file_compressed_with_zstd() -> Result<()> {
644+
let disk_spill_limit = 1024 * 1024; // 1MB
645+
let spill_compression = SpillCompression::Zstd;
646+
let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit, zstd
647+
648+
let df = ctx
649+
.sql("select * from generate_series(1, 100000) as t1(v1) order by v1")
650+
.await
651+
.unwrap();
652+
let plan = df.create_physical_plan().await.unwrap();
653+
654+
let task_ctx = ctx.task_ctx();
655+
let _ = collect_batches(Arc::clone(&plan), task_ctx)
656+
.await
657+
.expect("Query execution failed");
658+
659+
let spill_count = plan.metrics().unwrap().spill_count().unwrap();
660+
let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap();
661+
662+
println!("spill count {spill_count}");
663+
assert!(spill_count > 0);
664+
assert!((spilled_bytes as u64) < disk_spill_limit);
665+
666+
// Verify that all temporary files have been properly cleaned up by checking
667+
// that the total disk usage tracked by the disk manager is zero
668+
let current_disk_usage = ctx.runtime_env().disk_manager.used_disk_space();
669+
assert_eq!(current_disk_usage, 0);
670+
671+
Ok(())
672+
}
673+
674+
/// External query should succeed using lz4_frame as spill compression codec and
675+
/// and all temporary spill files are properly cleaned up after execution.
676+
/// Note: This test does not inspect file contents (e.g. magic number),
677+
/// as spill files are automatically deleted on drop.
678+
#[tokio::test]
679+
async fn test_spill_file_compressed_with_lz4_frame() -> Result<()> {
680+
let disk_spill_limit = 1024 * 1024; // 1MB
681+
let spill_compression = SpillCompression::Lz4Frame;
682+
let ctx = setup_context(disk_spill_limit, 128 * 1024, spill_compression).await?; // 1MB disk limit, 128KB memory limit, lz4_frame
683+
684+
let df = ctx
685+
.sql("select * from generate_series(1, 100000) as t1(v1) order by v1")
686+
.await
687+
.unwrap();
688+
let plan = df.create_physical_plan().await.unwrap();
689+
690+
let task_ctx = ctx.task_ctx();
691+
let _ = collect_batches(Arc::clone(&plan), task_ctx)
692+
.await
693+
.expect("Query execution failed");
694+
695+
let spill_count = plan.metrics().unwrap().spill_count().unwrap();
696+
let spilled_bytes = plan.metrics().unwrap().spilled_bytes().unwrap();
697+
698+
println!("spill count {spill_count}");
699+
assert!(spill_count > 0);
700+
assert!((spilled_bytes as u64) < disk_spill_limit);
701+
702+
// Verify that all temporary files have been properly cleaned up by checking
703+
// that the total disk usage tracked by the disk manager is zero
704+
let current_disk_usage = ctx.runtime_env().disk_manager.used_disk_space();
705+
assert_eq!(current_disk_usage, 0);
706+
707+
Ok(())
708+
}
633709
/// Run the query with the specified memory limit,
634710
/// and verifies the expected errors are returned
635711
#[derive(Clone, Debug)]

datafusion/execution/src/config.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::{
2323
};
2424

2525
use datafusion_common::{
26-
config::{ConfigExtension, ConfigOptions},
26+
config::{ConfigExtension, ConfigOptions, SpillCompression},
2727
Result, ScalarValue,
2828
};
2929

@@ -258,6 +258,11 @@ impl SessionConfig {
258258
self.options.execution.collect_statistics
259259
}
260260

261+
/// Compression codec for spill file
262+
pub fn spill_compression(&self) -> SpillCompression {
263+
self.options.execution.spill_compression
264+
}
265+
261266
/// Selects a name for the default catalog and schema
262267
pub fn with_default_catalog_and_schema(
263268
mut self,
@@ -421,6 +426,14 @@ impl SessionConfig {
421426
self
422427
}
423428

429+
/// Set the compression codec [`spill_compression`] used when spilling data to disk.
430+
///
431+
/// [`spill_compression`]: datafusion_common::config::ExecutionOptions::spill_compression
432+
pub fn with_spill_compression(mut self, spill_compression: SpillCompression) -> Self {
433+
self.options.execution.spill_compression = spill_compression;
434+
self
435+
}
436+
424437
/// Set the size of [`sort_in_place_threshold_bytes`] to control
425438
/// how sort does things.
426439
///

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,8 @@ impl GroupedHashAggregateStream {
552552
context.runtime_env(),
553553
metrics::SpillMetrics::new(&agg.metrics, partition),
554554
Arc::clone(&partial_agg_schema),
555-
);
555+
)
556+
.with_compression_type(context.session_config().spill_compression());
556557

557558
let spill_state = SpillState {
558559
spills: vec![],

datafusion/physical-plan/src/joins/sort_merge_join.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ use arrow::compute::{
6161
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
6262
use arrow::error::ArrowError;
6363
use arrow::ipc::reader::StreamReader;
64+
use datafusion_common::config::SpillCompression;
6465
use datafusion_common::{
6566
exec_err, internal_err, not_impl_err, plan_err, DataFusionError, HashSet, JoinSide,
6667
JoinType, NullEquality, Result,
@@ -500,6 +501,7 @@ impl ExecutionPlan for SortMergeJoinExec {
500501

501502
// create join stream
502503
Ok(Box::pin(SortMergeJoinStream::try_new(
504+
context.session_config().spill_compression(),
503505
Arc::clone(&self.schema),
504506
self.sort_options.clone(),
505507
self.null_equality,
@@ -1324,6 +1326,8 @@ impl Stream for SortMergeJoinStream {
13241326
impl SortMergeJoinStream {
13251327
#[allow(clippy::too_many_arguments)]
13261328
pub fn try_new(
1329+
// Configured via `datafusion.execution.spill_compression`.
1330+
spill_compression: SpillCompression,
13271331
schema: SchemaRef,
13281332
sort_options: Vec<SortOptions>,
13291333
null_equality: NullEquality,
@@ -1344,7 +1348,8 @@ impl SortMergeJoinStream {
13441348
Arc::clone(&runtime_env),
13451349
join_metrics.spill_metrics.clone(),
13461350
Arc::clone(&buffered_schema),
1347-
);
1351+
)
1352+
.with_compression_type(spill_compression);
13481353
Ok(Self {
13491354
state: SortMergeJoinState::Init,
13501355
sort_options,

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use crate::{
4848
use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
4949
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
5050
use arrow::datatypes::SchemaRef;
51+
use datafusion_common::config::SpillCompression;
5152
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, Result};
5253
use datafusion_execution::disk_manager::RefCountedTempFile;
5354
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
@@ -258,6 +259,8 @@ impl ExternalSorter {
258259
batch_size: usize,
259260
sort_spill_reservation_bytes: usize,
260261
sort_in_place_threshold_bytes: usize,
262+
// Configured via `datafusion.execution.spill_compression`.
263+
spill_compression: SpillCompression,
261264
metrics: &ExecutionPlanMetricsSet,
262265
runtime: Arc<RuntimeEnv>,
263266
) -> Result<Self> {
@@ -274,7 +277,8 @@ impl ExternalSorter {
274277
Arc::clone(&runtime),
275278
metrics.spill_metrics.clone(),
276279
Arc::clone(&schema),
277-
);
280+
)
281+
.with_compression_type(spill_compression);
278282

279283
Ok(Self {
280284
schema,
@@ -1173,6 +1177,7 @@ impl ExecutionPlan for SortExec {
11731177
context.session_config().batch_size(),
11741178
execution_options.sort_spill_reservation_bytes,
11751179
execution_options.sort_in_place_threshold_bytes,
1180+
context.session_config().spill_compression(),
11761181
&self.metrics_set,
11771182
context.runtime_env(),
11781183
)?;

datafusion/physical-plan/src/spill/in_progress_spill_file.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ impl InProgressSpillFile {
6767
self.writer = Some(IPCStreamWriter::new(
6868
in_progress_file.path(),
6969
schema.as_ref(),
70+
self.spill_writer.compression,
7071
)?);
7172

7273
// Update metrics

0 commit comments

Comments
 (0)