Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,19 @@ config_namespace! {
/// in parallel using the provided `target_partitions` level"
pub repartition_aggregations: bool, default = true

/// Minimum total files size in bytes to perform file scan repartitioning.
pub repartition_file_min_size: usize, default = 10 * 1024 * 1024

/// Should DataFusion repartition data using the join keys to execute joins in parallel
/// using the provided `target_partitions` level"
pub repartition_joins: bool, default = true

/// When set to true, file groups will be repartitioned to achieve maximum parallelism.
/// Currently supported only for Parquet format in which case
/// multiple row groups from the same file may be read concurrently. If false then each
/// row group is read serially, though different files may be read in parallel.
pub repartition_file_scans: bool, default = false

/// Should DataFusion repartition data using the partitions keys to execute window
/// functions in parallel using the provided `target_partitions` level"
pub repartition_windows: bool, default = true
Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,18 @@ impl SessionConfig {
self
}

/// Sets minimum file range size for repartitioning scans
pub fn with_repartition_file_min_size(mut self, size: usize) -> Self {
self.options.optimizer.repartition_file_min_size = size;
self
}

/// Enables or disables the use of repartitioning for file scans
pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_file_scans = enabled;
self
}

/// Enables or disables the use of repartitioning for window functions to improve parallelism
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_windows = enabled;
Expand Down
261 changes: 252 additions & 9 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_plan::Partitioning::*;
use crate::physical_plan::{
repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan,
file_format::ParquetExec, repartition::RepartitionExec,
with_new_children_if_necessary, ExecutionPlan,
};

/// Optimizer that introduces repartition to introduce more
Expand Down Expand Up @@ -167,6 +168,8 @@ fn optimize_partitions(
is_root: bool,
can_reorder: bool,
would_benefit: bool,
repartition_file_scans: bool,
repartition_file_min_size: usize,
) -> Result<Arc<dyn ExecutionPlan>> {
// Recurse into children bottom-up (attempt to repartition as
// early as possible)
Expand Down Expand Up @@ -199,6 +202,8 @@ fn optimize_partitions(
false, // child is not root
can_reorder_child,
plan.benefits_from_input_partitioning(),
repartition_file_scans,
repartition_file_min_size,
)
})
.collect::<Result<_>>()?;
Expand Down Expand Up @@ -227,14 +232,28 @@ fn optimize_partitions(
could_repartition = false;
}

if would_benefit && could_repartition && can_reorder {
Ok(Arc::new(RepartitionExec::try_new(
new_plan,
RoundRobinBatch(target_partitions),
)?))
} else {
Ok(new_plan)
let repartition_allowed = would_benefit && could_repartition && can_reorder;

// If repartition is not allowed - return plan as it is
if !repartition_allowed {
return Ok(new_plan);
}

// For ParquetExec return internally repartitioned version of the plan in case `repartition_file_scans` is set
if let Some(parquet_exec) = new_plan.as_any().downcast_ref::<ParquetExec>() {
if repartition_file_scans {
return Ok(Arc::new(
parquet_exec
.get_repartitioned(target_partitions, repartition_file_min_size),
));
}
}

// Otherwise - return plan wrapped up in RepartitionExec
Ok(Arc::new(RepartitionExec::try_new(
new_plan,
RoundRobinBatch(target_partitions),
)?))
}

/// Returns true if `plan` requires any of inputs to be sorted in some
Expand All @@ -253,6 +272,8 @@ impl PhysicalOptimizerRule for Repartition {
) -> Result<Arc<dyn ExecutionPlan>> {
let target_partitions = config.execution.target_partitions;
let enabled = config.optimizer.enable_round_robin_repartition;
let repartition_file_scans = config.optimizer.repartition_file_scans;
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
// Don't run optimizer if target_partitions == 1
if !enabled || target_partitions == 1 {
Ok(plan)
Expand All @@ -266,6 +287,8 @@ impl PhysicalOptimizerRule for Repartition {
is_root,
can_reorder,
would_benefit,
repartition_file_scans,
repartition_file_min_size,
)
}
}
Expand Down Expand Up @@ -331,6 +354,28 @@ mod tests {
))
}

/// Create a non sorted parquet exec over two files / partitions
fn parquet_exec_two_partitions() -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema(),
file_groups: vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 200)],
],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
},
None,
None,
))
}

// Created a sorted parquet exec
fn parquet_exec_sorted() -> Arc<ParquetExec> {
let sort_exprs = vec![PhysicalSortExpr {
Expand Down Expand Up @@ -448,10 +493,16 @@ mod tests {
/// Runs the repartition optimizer and asserts the plan against the expected
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, 10, false, 1024);
};

($EXPECTED_LINES: expr, $PLAN: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();

let mut config = ConfigOptions::new();
config.execution.target_partitions = 10;
config.execution.target_partitions = $TARGET_PARTITIONS;
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;

// run optimizer
let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Expand Down Expand Up @@ -846,6 +897,198 @@ mod tests {
Ok(())
}

#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the test coverage

fn parallelization_single_partition() -> Result<()> {
let plan = aggregate(parquet_exec());

let expected = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"ParquetExec: limit=None, partitions={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_two_partitions() -> Result<()> {
let plan = aggregate(parquet_exec_two_partitions());

let expected = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
// Plan already has two partitions
"ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_two_partitions_into_four() -> Result<()> {
let plan = aggregate(parquet_exec_two_partitions());

let expected = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
// Multiple source files splitted across partitions
"ParquetExec: limit=None, partitions={4 groups: [[x:0..75], [x:75..100, y:0..50], [y:50..125], [y:125..200]]}, projection=[c1]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is quite clever that the partitions have different parts of the same file 👍

];

assert_optimized!(expected, plan, 4, true, 10);
Ok(())
}

#[test]
fn parallelization_sorted_limit() -> Result<()> {
let plan = limit_exec(sort_exec(parquet_exec(), false));

let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
// data is sorted so can't repartition here
"SortExec: [c1@0 ASC]",
// Doesn't parallelize for SortExec without preserve_partitioning
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_limit_with_filter() -> Result<()> {
let plan = limit_exec(filter_exec(sort_exec(parquet_exec(), false)));

let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// data is sorted so can't repartition here even though
// filter would benefit from parallelism, the answers might be wrong
"SortExec: [c1@0 ASC]",
// SortExec doesn't benefit from input partitioning
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_ignores_limit() -> Result<()> {
let plan = aggregate(limit_exec(filter_exec(limit_exec(parquet_exec()))));

let expected = &[
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// repartition should happen prior to the filter to maximize parallelism
"RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
// Limit doesn't benefit from input partitionins - no parallelism
"LocalLimitExec: fetch=100",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_union_inputs() -> Result<()> {
let plan = union_exec(vec![parquet_exec(); 5]);

let expected = &[
"UnionExec",
// Union doesn benefit from input partitioning - no parallelism
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_prior_to_sort_preserving_merge() -> Result<()> {
// sort preserving merge already sorted input,
let plan = sort_preserving_merge_exec(parquet_exec_sorted());

// parallelization potentially could break sort order
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_sort_preserving_merge_with_union() -> Result<()> {
// 2 sorted parquet files unioned (partitions are concatenated, sort is preserved)
let input = union_exec(vec![parquet_exec_sorted(); 2]);
let plan = sort_preserving_merge_exec(input);

// should not repartition / sort (as the data was already sorted)
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"UnionExec",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_does_not_destroy_sort() -> Result<()> {
// SortRequired
// Parquet(sorted)

let plan = sort_required_exec(parquet_exec_sorted());

// no parallelization to preserve sort order
let expected = &[
"SortRequiredExec",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_ignores_transitively_with_projection() -> Result<()> {
// sorted input
let plan = sort_preserving_merge_exec(projection_exec(parquet_exec_sorted()));

// data should not be repartitioned / resorted
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

/// Models operators like BoundedWindowExec that require an input
/// ordering but is easy to construct
#[derive(Debug)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ impl<'a> Display for FileGroupsDisplay<'a> {
first_file = false;

write!(f, "{}", pf.object_meta.location.as_ref())?;

if let Some(range) = pf.range.as_ref() {
write!(f, ":{}..{}", range.start, range.end)?;
}
}
write!(f, "]")?;
}
Expand Down
Loading