Skip to content

Commit 7561c13

Browse files
committed
Chore: fix docs & fmt
1 parent 555877f commit 7561c13

File tree

6 files changed

+18
-17
lines changed

6 files changed

+18
-17
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ arrow-flight = { version = "55.1.0", features = [
9797
"flight-sql-experimental",
9898
] }
9999
arrow-ipc = { version = "55.0.0", default-features = false, features = [
100-
"lz4", "zstd",
100+
"lz4",
101+
"zstd",
101102
] }
102103
arrow-ord = { version = "55.0.0", default-features = false }
103104
arrow-schema = { version = "55.0.0", default-features = false }

datafusion/common/src/config.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,6 @@ config_namespace! {
276276
}
277277
}
278278

279-
// TODO where should we parse?
280279
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281280
pub enum SpillCompression {
282281
Zstd,
@@ -387,9 +386,11 @@ config_namespace! {
387386
/// the new schema verification step.
388387
pub skip_physical_aggregate_schema_check: bool, default = false
389388

390-
/// TODO do we may need to support compression level setting Then we may change where parsing config happens
391-
/// Sets default compression codec for spill file.
392-
/// Valid values are: uncompressed, zstd, lz4_frame
389+
/// Sets the compression codec used when spilling data to disk.
390+
///
391+
/// Since datafusion writes spill files using the Arrow IPC Stream format,
392+
/// only codecs supported by the Arrow IPC Stream Writer are allowed.
393+
/// Valid values are: uncompressed, lz4_frame, zstd
393394
pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
394395

395396
/// Specifies the reserved memory for each spillable sort operation to

datafusion/physical-plan/src/spill/in_progress_spill_file.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ impl InProgressSpillFile {
6464
if self.writer.is_none() {
6565
let schema = batch.schema();
6666
if let Some(ref in_progress_file) = self.in_progress_file {
67-
// TODO hmm passing compresion option again?
6867
self.writer = Some(IPCStreamWriter::new(
6968
in_progress_file.path(),
7069
schema.as_ref(),

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,6 @@ impl IPCStreamWriter {
305305
exec_datafusion_err!("Failed to create partition file at {path:?}: {e:?}")
306306
})?;
307307

308-
// TODO what should be default metadata version & alignment?
309308
let metadata_version = MetadataVersion::V5;
310309
let alignment = 8;
311310
let mut write_options =
@@ -349,8 +348,7 @@ mod tests {
349348
use crate::metrics::SpillMetrics;
350349
use crate::spill::spill_manager::SpillManager;
351350
use crate::test::build_table_i32;
352-
use arrow::array::ArrayRef;
353-
use arrow::array::{Float64Array, Int32Array, ListArray, StringArray};
351+
use arrow::array::{ArrayRef, Float64Array, Int32Array, ListArray, StringArray};
354352
use arrow::compute::cast;
355353
use arrow::datatypes::{DataType, Field, Int32Type, Schema};
356354
use arrow::record_batch::RecordBatch;
@@ -510,9 +508,9 @@ mod tests {
510508
Field::new("c", DataType::Int32, true),
511509
]));
512510

513-
let a: ArrayRef = Arc::new(StringArray::from_iter_values(
514-
std::iter::repeat("repeated").take(100),
515-
));
511+
let a: ArrayRef = Arc::new(StringArray::from_iter_values(std::iter::repeat_n(
512+
"repeated", 100,
513+
)));
516514
let b: ArrayRef = Arc::new(Int32Array::from(vec![1; 100]));
517515
let c: ArrayRef = Arc::new(Int32Array::from(vec![2; 100]));
518516

@@ -552,13 +550,13 @@ mod tests {
552550
let lz4_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
553551
let zstd_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
554552
let uncompressed_spill_manager = SpillManager::new(
555-
env.clone(),
553+
Arc::clone(&env),
556554
uncompressed_metrics,
557555
Arc::clone(&schema),
558556
SpillCompression::Uncompressed,
559557
);
560558
let lz4_spill_manager = SpillManager::new(
561-
env.clone(),
559+
Arc::clone(&env),
562560
lz4_metrics,
563561
Arc::clone(&schema),
564562
SpillCompression::Lz4Frame,
@@ -590,20 +588,19 @@ mod tests {
590588
assert!(uncompressed_spill_size > lz4_spill_size);
591589
assert!(uncompressed_spill_size > zstd_spill_size);
592590

593-
// TODO validate with function
594591
validate(
595592
&lz4_spill_manager,
596593
lz4_spill_file,
597594
num_rows,
598-
schema.clone(),
595+
Arc::clone(&schema),
599596
batch_count,
600597
)
601598
.await?;
602599
validate(
603600
&zstd_spill_manager,
604601
zstd_spill_file,
605602
num_rows,
606-
schema.clone(),
603+
Arc::clone(&schema),
607604
batch_count,
608605
)
609606
.await?;

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ datafusion.execution.skip_physical_aggregate_schema_check false
263263
datafusion.execution.soft_max_rows_per_output_file 50000000
264264
datafusion.execution.sort_in_place_threshold_bytes 1048576
265265
datafusion.execution.sort_spill_reservation_bytes 10485760
266+
datafusion.execution.spill_compression (empty)
266267
datafusion.execution.split_file_groups_by_statistics false
267268
datafusion.execution.target_partitions 7
268269
datafusion.execution.time_zone +00:00
@@ -374,6 +375,7 @@ datafusion.execution.skip_physical_aggregate_schema_check false When set to true
374375
datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max
375376
datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged.
376377
datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured).
378+
datafusion.execution.spill_compression (empty) Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd
377379
datafusion.execution.split_file_groups_by_statistics false Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental
378380
datafusion.execution.target_partitions 7 Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system
379381
datafusion.execution.time_zone +00:00 The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour

docs/source/user-guide/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
8383
| datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. |
8484
| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system |
8585
| datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. |
86+
| datafusion.execution.spill_compression | | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd |
8687
| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). |
8788
| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. |
8889
| datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics |

0 commit comments

Comments
 (0)