Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {
/// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering:
/// Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
/// Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
/// Requirements can be satisfied by adjusting keys ordering, clear the current requiements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
/// Requirements can be satisfied by adjusting keys ordering, clear the current requirements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
///
/// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering:
/// Requirements can not be satisfied, clear all the requirements, return the unchanged plan.
Expand Down Expand Up @@ -928,7 +928,7 @@ fn add_roundrobin_on_top(
// If any of the following conditions is true
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
// - Usage of order preserving variants is not desirable
// (determined by flag `config.optimizer.bounded_order_preserving_variants`)
// (determined by flag `config.optimizer.prefer_existing_sort`)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this config option was renamed to prefer_existing_sort but some of the comments and variable names refer to the old name

let partitioning = Partitioning::RoundRobinBatch(n_target);
let repartition =
RepartitionExec::try_new(input, partitioning)?.with_preserve_order();
Expand Down Expand Up @@ -996,7 +996,7 @@ fn add_hash_on_top(
// - Preserving ordering is not helpful in terms of satisfying ordering
// requirements.
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.bounded_order_preserving_variants`).
// `config.optimizer.prefer_existing_sort`).
let mut new_plan = if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
Expand Down Expand Up @@ -1045,7 +1045,7 @@ fn add_spm_on_top(
// If any of the following conditions is true
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
// - Usage of order preserving variants is not desirable
// (determined by flag `config.optimizer.bounded_order_preserving_variants`)
// (determined by flag `config.optimizer.prefer_existing_sort`)
let should_preserve_ordering = input.output_ordering().is_some();
let new_plan: Arc<dyn ExecutionPlan> = if should_preserve_ordering {
let existing_ordering = input.output_ordering().unwrap_or(&[]);
Expand Down Expand Up @@ -2026,15 +2026,15 @@ pub(crate) mod tests {
fn ensure_distribution_helper(
plan: Arc<dyn ExecutionPlan>,
target_partitions: usize,
bounded_order_preserving_variants: bool,
prefer_existing_sort: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let distribution_context = DistributionContext::new(plan);
let mut config = ConfigOptions::new();
config.execution.target_partitions = target_partitions;
config.optimizer.enable_round_robin_repartition = false;
config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
config.optimizer.prefer_existing_sort = bounded_order_preserving_variants;
config.optimizer.prefer_existing_sort = prefer_existing_sort;
ensure_distribution(distribution_context, &config).map(|item| item.into().plan)
}

Expand All @@ -2056,23 +2056,33 @@ pub(crate) mod tests {
}

/// Runs the repartition optimizer and asserts the plan against the expected
/// Arguments
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it very hard to read some of the tests as a line like this

        assert_optimized!(expected, physical_plan, false, true);

As I had to go back to the macro definitions several times to remember what the false and true parameters meant

/// * `EXPECTED_LINES` - Expected output plan
/// * `PLAN` - Input plan
/// * `FIRST_ENFORCE_DIST` -
/// true: (EnforceDistribution, EnforceDistribution, EnforceSorting)
/// false: else runs (EnforceSorting, EnforceDistribution, EnforceDistribution)
/// * `PREFER_EXISTING_SORT` (optional) - if true, will not repartition / resort data if it is already sorted
/// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to
/// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file scans
/// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $BOUNDED_ORDER_PRESERVING_VARIANTS: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $BOUNDED_ORDER_PRESERVING_VARIANTS, 10, false, 1024);
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024);
};

($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $BOUNDED_ORDER_PRESERVING_VARIANTS: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();

let mut config = ConfigOptions::new();
config.execution.target_partitions = $TARGET_PARTITIONS;
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
config.optimizer.prefer_existing_sort = $BOUNDED_ORDER_PRESERVING_VARIANTS;
config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT;

// NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
// because they were written prior to the separation of `BasicEnforcement` into
Expand Down Expand Up @@ -3294,7 +3304,7 @@ pub(crate) mod tests {
];
assert_optimized!(expected, exec, true);
// In this case preserving ordering through order preserving operators is not desirable
// (according to flag: bounded_order_preserving_variants)
// (according to flag: PREFER_EXISTING_SORT)
// hence in this case ordering lost during CoalescePartitionsExec and re-introduced with
// SortExec at the top.
let expected = &[
Expand Down Expand Up @@ -4341,7 +4351,7 @@ pub(crate) mod tests {
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];

// last flag sets config.optimizer.bounded_order_preserving_variants
// last flag sets config.optimizer.PREFER_EXISTING_SORT
assert_optimized!(expected, physical_plan.clone(), true, true);
assert_optimized!(expected, physical_plan, false, true);

Expand Down