Skip to content

Commit 4a6acb4

Browse files
committed
Add compression option to SpillManager
1 parent 3a312a9 commit 4a6acb4

File tree

9 files changed

+153
-17
lines changed

9 files changed

+153
-17
lines changed

datafusion/common/src/config.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
//! Runtime configuration, via [`ConfigOptions`]
1919
20+
use arrow_ipc::CompressionType;
21+
2022
use crate::error::_config_err;
2123
use crate::parsers::CompressionTypeVariant;
2224
use crate::utils::get_available_parallelism;
@@ -274,6 +276,61 @@ config_namespace! {
274276
}
275277
}
276278

279+
// TODO where should we parse?
280+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281+
pub enum SpillCompression {
282+
Zstd,
283+
Lz4_frame,
284+
Uncompressed,
285+
}
286+
287+
impl FromStr for SpillCompression {
288+
type Err = DataFusionError;
289+
290+
fn from_str(s: &str) -> Result<Self, Self::Err> {
291+
match s.to_ascii_lowercase().as_str() {
292+
"zstd" => Ok(Self::Zstd),
293+
"lz4_frame" => Ok(Self::Lz4_frame),
294+
"uncompressed" | "" => Ok(Self::Uncompressed),
295+
other => Err(DataFusionError::Execution(format!(
296+
"Invalid Spill file compression type: {other}. Expected one of: zstd, lz4, uncompressed"
297+
))),
298+
}
299+
}
300+
}
301+
302+
impl ConfigField for SpillCompression {
303+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
304+
v.some(key, self, description)
305+
}
306+
307+
fn set(&mut self, _: &str, value: &str) -> Result<()> {
308+
*self = SpillCompression::from_str(value)?;
309+
Ok(())
310+
}
311+
}
312+
313+
impl Display for SpillCompression {
314+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315+
let str = match self {
316+
Self::Zstd => "Zstd",
317+
Self::Lz4_frame => "Lz4",
318+
Self::Uncompressed => "",
319+
};
320+
write!(f, "{str}")
321+
}
322+
}
323+
324+
impl From<SpillCompression> for Option<CompressionType> {
325+
fn from(c: SpillCompression) -> Self {
326+
match c {
327+
SpillCompression::Zstd => Some(CompressionType::ZSTD),
328+
SpillCompression::Lz4_frame => Some(CompressionType::LZ4_FRAME),
329+
SpillCompression::Uncompressed => None,
330+
}
331+
}
332+
}
333+
277334
config_namespace! {
278335
/// Options related to query execution
279336
///
@@ -330,6 +387,11 @@ config_namespace! {
330387
/// the new schema verification step.
331388
pub skip_physical_aggregate_schema_check: bool, default = false
332389

390+
/// TODO do we may need to support compression level setting Then we may change where parsing config happens
391+
/// Sets default compression codec for spill file.
392+
/// Valid values are: uncompressed, zstd, lz4_frame
393+
pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
394+
333395
/// Specifies the reserved memory for each spillable sort operation to
334396
/// facilitate an in-memory merge.
335397
///

datafusion/execution/src/config.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::{
2323
};
2424

2525
use datafusion_common::{
26-
config::{ConfigExtension, ConfigOptions},
26+
config::{ConfigExtension, ConfigOptions, SpillCompression},
2727
Result, ScalarValue,
2828
};
2929

@@ -258,6 +258,11 @@ impl SessionConfig {
258258
self.options.execution.collect_statistics
259259
}
260260

261+
/// Compression codec for spill file
262+
pub fn spill_compression(&self) -> SpillCompression {
263+
self.options.execution.spill_compression
264+
}
265+
261266
/// Selects a name for the default catalog and schema
262267
pub fn with_default_catalog_and_schema(
263268
mut self,

datafusion/physical-plan/benches/spill_io.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use arrow::array::{
2020
};
2121
use arrow::datatypes::{DataType, Field, Schema};
2222
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
23+
use datafusion_common::config::SpillCompression;
2324
use datafusion_execution::runtime_env::RuntimeEnv;
2425
use datafusion_physical_plan::common::collect;
2526
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
@@ -82,7 +83,7 @@ fn bench_spill_io(c: &mut Criterion) {
8283
Field::new("c2", DataType::Date32, true),
8384
Field::new("c3", DataType::Decimal128(11, 2), true),
8485
]));
85-
let spill_manager = SpillManager::new(env, metrics, schema);
86+
let spill_manager = SpillManager::new(env, metrics, schema, SpillCompression::Uncompressed);
8687

8788
let mut group = c.benchmark_group("spill_io");
8889
let rt = Runtime::new().unwrap();

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,7 @@ impl GroupedHashAggregateStream {
552552
context.runtime_env(),
553553
metrics::SpillMetrics::new(&agg.metrics, partition),
554554
Arc::clone(&partial_agg_schema),
555+
context.session_config().spill_compression(),
555556
);
556557

557558
let spill_state = SpillState {

datafusion/physical-plan/src/joins/sort_merge_join.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ use arrow::compute::{
6161
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
6262
use arrow::error::ArrowError;
6363
use arrow::ipc::reader::StreamReader;
64+
use datafusion_common::config::SpillCompression;
6465
use datafusion_common::{
6566
exec_err, internal_err, not_impl_err, plan_err, DataFusionError, HashSet, JoinSide,
6667
JoinType, Result,
@@ -495,6 +496,7 @@ impl ExecutionPlan for SortMergeJoinExec {
495496

496497
// create join stream
497498
Ok(Box::pin(SortMergeJoinStream::try_new(
499+
context.session_config().spill_compression(),
498500
Arc::clone(&self.schema),
499501
self.sort_options.clone(),
500502
self.null_equals_null,
@@ -1319,6 +1321,7 @@ impl Stream for SortMergeJoinStream {
13191321
impl SortMergeJoinStream {
13201322
#[allow(clippy::too_many_arguments)]
13211323
pub fn try_new(
1324+
spill_compression: SpillCompression,
13221325
schema: SchemaRef,
13231326
sort_options: Vec<SortOptions>,
13241327
null_equals_null: bool,
@@ -1339,6 +1342,7 @@ impl SortMergeJoinStream {
13391342
Arc::clone(&runtime_env),
13401343
join_metrics.spill_metrics.clone(),
13411344
Arc::clone(&buffered_schema),
1345+
spill_compression,
13421346
);
13431347
Ok(Self {
13441348
state: SortMergeJoinState::Init,

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use crate::{
4747
use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
4848
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
4949
use arrow::datatypes::SchemaRef;
50+
use datafusion_common::config::SpillCompression;
5051
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, Result};
5152
use datafusion_execution::disk_manager::RefCountedTempFile;
5253
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
@@ -255,6 +256,7 @@ impl ExternalSorter {
255256
batch_size: usize,
256257
sort_spill_reservation_bytes: usize,
257258
sort_in_place_threshold_bytes: usize,
259+
spill_compression: SpillCompression,
258260
metrics: &ExecutionPlanMetricsSet,
259261
runtime: Arc<RuntimeEnv>,
260262
) -> Result<Self> {
@@ -271,6 +273,7 @@ impl ExternalSorter {
271273
Arc::clone(&runtime),
272274
metrics.spill_metrics.clone(),
273275
Arc::clone(&schema),
276+
spill_compression,
274277
);
275278

276279
Ok(Self {
@@ -1146,6 +1149,7 @@ impl ExecutionPlan for SortExec {
11461149
context.session_config().batch_size(),
11471150
execution_options.sort_spill_reservation_bytes,
11481151
execution_options.sort_in_place_threshold_bytes,
1152+
context.session_config().spill_compression(),
11491153
&self.metrics_set,
11501154
context.runtime_env(),
11511155
)?;

datafusion/physical-plan/src/spill/in_progress_spill_file.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,11 @@ impl InProgressSpillFile {
6464
if self.writer.is_none() {
6565
let schema = batch.schema();
6666
if let Some(ref in_progress_file) = self.in_progress_file {
67+
// TODO hmm passing compresion option again?
6768
self.writer = Some(IPCStreamWriter::new(
6869
in_progress_file.path(),
6970
schema.as_ref(),
71+
self.spill_writer.compression,
7072
)?);
7173

7274
// Update metrics

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,12 @@ use std::task::{Context, Poll};
3030

3131
use arrow::array::ArrayData;
3232
use arrow::datatypes::{Schema, SchemaRef};
33+
use arrow::ipc::writer::IpcWriteOptions;
3334
use arrow::ipc::{reader::StreamReader, writer::StreamWriter};
35+
use arrow::ipc::{CompressionType, MetadataVersion};
3436
use arrow::record_batch::RecordBatch;
3537

38+
use datafusion_common::config::SpillCompression;
3639
use datafusion_common::{exec_datafusion_err, DataFusionError, HashSet, Result};
3740
use datafusion_common_runtime::SpawnedTask;
3841
use datafusion_execution::disk_manager::RefCountedTempFile;
@@ -194,7 +197,8 @@ pub fn spill_record_batch_by_size(
194197
) -> Result<()> {
195198
let mut offset = 0;
196199
let total_rows = batch.num_rows();
197-
let mut writer = IPCStreamWriter::new(&path, schema.as_ref())?;
200+
let mut writer =
201+
IPCStreamWriter::new(&path, schema.as_ref(), SpillCompression::Uncompressed)?;
198202

199203
while offset < total_rows {
200204
let length = std::cmp::min(total_rows - offset, batch_size_rows);
@@ -292,15 +296,28 @@ struct IPCStreamWriter {
292296

293297
impl IPCStreamWriter {
294298
/// Create new writer
295-
pub fn new(path: &Path, schema: &Schema) -> Result<Self> {
299+
pub fn new(
300+
path: &Path,
301+
schema: &Schema,
302+
compression_type: SpillCompression,
303+
) -> Result<Self> {
296304
let file = File::create(path).map_err(|e| {
297305
exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}")
298306
})?;
307+
308+
// TODO what should be default metadata version & alignment?
309+
let metadata_version = MetadataVersion::V5;
310+
let alignment = 8;
311+
let mut write_options =
312+
IpcWriteOptions::try_new(alignment, false, metadata_version)?;
313+
write_options = write_options.try_with_compression(compression_type.into())?;
314+
315+
let writer = StreamWriter::try_new_with_options(file, schema, write_options)?;
299316
Ok(Self {
300317
num_batches: 0,
301318
num_rows: 0,
302319
num_bytes: 0,
303-
writer: StreamWriter::try_new(file, schema)?,
320+
writer,
304321
})
305322
}
306323

@@ -362,7 +379,12 @@ mod tests {
362379
// Construct SpillManager
363380
let env = Arc::new(RuntimeEnv::default());
364381
let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
365-
let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema));
382+
let spill_manager = SpillManager::new(
383+
env,
384+
metrics,
385+
Arc::clone(&schema),
386+
SpillCompression::Uncompressed,
387+
);
366388

367389
let spill_file = spill_manager
368390
.spill_record_batch_and_finish(&[batch1, batch2], "Test")?
@@ -426,7 +448,12 @@ mod tests {
426448
// Construct SpillManager
427449
let env = Arc::new(RuntimeEnv::default());
428450
let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
429-
let spill_manager = SpillManager::new(env, metrics, Arc::clone(&dict_schema));
451+
let spill_manager = SpillManager::new(
452+
env,
453+
metrics,
454+
Arc::clone(&dict_schema),
455+
SpillCompression::Uncompressed,
456+
);
430457

431458
let num_rows = batch1.num_rows() + batch2.num_rows();
432459
let spill_file = spill_manager
@@ -454,7 +481,12 @@ mod tests {
454481
let schema = batch1.schema();
455482
let env = Arc::new(RuntimeEnv::default());
456483
let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
457-
let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema));
484+
let spill_manager = SpillManager::new(
485+
env,
486+
metrics,
487+
Arc::clone(&schema),
488+
SpillCompression::Uncompressed,
489+
);
458490

459491
let spill_file = spill_manager
460492
.spill_record_batch_by_size(&batch1, "Test Spill", 1)?
@@ -608,7 +640,12 @@ mod tests {
608640
Field::new("b", DataType::Utf8, false),
609641
]));
610642

611-
let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema));
643+
let spill_manager = SpillManager::new(
644+
env,
645+
metrics,
646+
Arc::clone(&schema),
647+
SpillCompression::Uncompressed,
648+
);
612649

613650
let batch = RecordBatch::try_new(
614651
schema,
@@ -665,8 +702,12 @@ mod tests {
665702
Field::new("b", DataType::Utf8, false),
666703
]));
667704

668-
let spill_manager =
669-
Arc::new(SpillManager::new(env, metrics, Arc::clone(&schema)));
705+
let spill_manager = Arc::new(SpillManager::new(
706+
env,
707+
metrics,
708+
Arc::clone(&schema),
709+
SpillCompression::Uncompressed,
710+
));
670711
let mut in_progress_file = spill_manager.create_in_progress_file("Test")?;
671712

672713
let batch1 = RecordBatch::try_new(
@@ -712,8 +753,12 @@ mod tests {
712753
Field::new("b", DataType::Utf8, false),
713754
]));
714755

715-
let spill_manager =
716-
Arc::new(SpillManager::new(env, metrics, Arc::clone(&schema)));
756+
let spill_manager = Arc::new(SpillManager::new(
757+
env,
758+
metrics,
759+
Arc::clone(&schema),
760+
SpillCompression::Uncompressed,
761+
));
717762

718763
// Test write empty batch with interface `InProgressSpillFile` and `append_batch()`
719764
let mut in_progress_file = spill_manager.create_in_progress_file("Test")?;
@@ -758,7 +803,12 @@ mod tests {
758803
// Construct SpillManager
759804
let env = Arc::new(RuntimeEnv::default());
760805
let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
761-
let spill_manager = SpillManager::new(env, metrics, Arc::clone(&schema));
806+
let spill_manager = SpillManager::new(
807+
env,
808+
metrics,
809+
Arc::clone(&schema),
810+
SpillCompression::Uncompressed,
811+
);
762812
let batches: [_; 10] = std::array::from_fn(|_| batch.clone());
763813

764814
let spill_file_1 = spill_manager

datafusion/physical-plan/src/spill/spill_manager.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use arrow::datatypes::SchemaRef;
2323
use arrow::record_batch::RecordBatch;
2424
use datafusion_execution::runtime_env::RuntimeEnv;
2525

26-
use datafusion_common::Result;
26+
use datafusion_common::{config::SpillCompression, Result};
2727
use datafusion_execution::disk_manager::RefCountedTempFile;
2828
use datafusion_execution::SendableRecordBatchStream;
2929

@@ -44,16 +44,23 @@ pub struct SpillManager {
4444
schema: SchemaRef,
4545
/// Number of batches to buffer in memory during disk reads
4646
batch_read_buffer_capacity: usize,
47-
// TODO: Add general-purpose compression options
47+
/// general-purpose compression options
48+
pub(crate) compression: SpillCompression,
4849
}
4950

5051
impl SpillManager {
51-
pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) -> Self {
52+
pub fn new(
53+
env: Arc<RuntimeEnv>,
54+
metrics: SpillMetrics,
55+
schema: SchemaRef,
56+
compression: SpillCompression,
57+
) -> Self {
5258
Self {
5359
env,
5460
metrics,
5561
schema,
5662
batch_read_buffer_capacity: 2,
63+
compression,
5764
}
5865
}
5966

0 commit comments

Comments
 (0)