-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Minor: Improve comments in EnforceDistribution tests #8474
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -256,7 +256,7 @@ impl PhysicalOptimizerRule for EnforceDistribution { | |
/// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering: | ||
/// Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan. | ||
/// Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan. | ||
/// Requirements can be satisfied by adjusting keys ordering, clear the current requiements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan. | ||
/// Requirements can be satisfied by adjusting keys ordering, clear the current requirements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan. | ||
/// | ||
/// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering: | ||
/// Requirements can not be satisfied, clear all the requirements, return the unchanged plan. | ||
|
@@ -928,7 +928,7 @@ fn add_roundrobin_on_top( | |
// If any of the following conditions is true | ||
// - Preserving ordering is not helpful in terms of satisfying ordering requirements | ||
// - Usage of order preserving variants is not desirable | ||
// (determined by flag `config.optimizer.bounded_order_preserving_variants`) | ||
// (determined by flag `config.optimizer.prefer_existing_sort`) | ||
let partitioning = Partitioning::RoundRobinBatch(n_target); | ||
let repartition = | ||
RepartitionExec::try_new(input, partitioning)?.with_preserve_order(); | ||
|
@@ -996,7 +996,7 @@ fn add_hash_on_top( | |
// - Preserving ordering is not helpful in terms of satisfying ordering | ||
// requirements. | ||
// - Usage of order preserving variants is not desirable (per the flag | ||
// `config.optimizer.bounded_order_preserving_variants`). | ||
// `config.optimizer.prefer_existing_sort`). | ||
let mut new_plan = if repartition_beneficial_stats { | ||
// Since hashing benefits from partitioning, add a round-robin repartition | ||
// before it: | ||
|
@@ -1045,7 +1045,7 @@ fn add_spm_on_top( | |
// If any of the following conditions is true | ||
// - Preserving ordering is not helpful in terms of satisfying ordering requirements | ||
// - Usage of order preserving variants is not desirable | ||
// (determined by flag `config.optimizer.bounded_order_preserving_variants`) | ||
// (determined by flag `config.optimizer.prefer_existing_sort`) | ||
let should_preserve_ordering = input.output_ordering().is_some(); | ||
let new_plan: Arc<dyn ExecutionPlan> = if should_preserve_ordering { | ||
let existing_ordering = input.output_ordering().unwrap_or(&[]); | ||
|
@@ -2026,15 +2026,15 @@ pub(crate) mod tests { | |
fn ensure_distribution_helper( | ||
plan: Arc<dyn ExecutionPlan>, | ||
target_partitions: usize, | ||
bounded_order_preserving_variants: bool, | ||
prefer_existing_sort: bool, | ||
) -> Result<Arc<dyn ExecutionPlan>> { | ||
let distribution_context = DistributionContext::new(plan); | ||
let mut config = ConfigOptions::new(); | ||
config.execution.target_partitions = target_partitions; | ||
config.optimizer.enable_round_robin_repartition = false; | ||
config.optimizer.repartition_file_scans = false; | ||
config.optimizer.repartition_file_min_size = 1024; | ||
config.optimizer.prefer_existing_sort = bounded_order_preserving_variants; | ||
config.optimizer.prefer_existing_sort = prefer_existing_sort; | ||
ensure_distribution(distribution_context, &config).map(|item| item.into().plan) | ||
} | ||
|
||
|
@@ -2056,23 +2056,33 @@ pub(crate) mod tests { | |
} | ||
|
||
/// Runs the repartition optimizer and asserts the plan against the expected | ||
/// Arguments | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found it very hard to read some of the tests as a line like this
As I had to go back to the macro definitions several times to remember what the |
||
/// * `EXPECTED_LINES` - Expected output plan | ||
/// * `PLAN` - Input plan | ||
/// * `FIRST_ENFORCE_DIST` - | ||
/// true: (EnforceDistribution, EnforceDistribution, EnforceSorting) | ||
/// false: else runs (EnforceSorting, EnforceDistribution, EnforceDistribution) | ||
/// * `PREFER_EXISTING_SORT` (optional) - if true, will not repartition / resort data if it is already sorted | ||
/// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to | ||
/// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file scans | ||
/// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition | ||
macro_rules! assert_optimized { | ||
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => { | ||
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024); | ||
}; | ||
|
||
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $BOUNDED_ORDER_PRESERVING_VARIANTS: expr) => { | ||
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $BOUNDED_ORDER_PRESERVING_VARIANTS, 10, false, 1024); | ||
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr) => { | ||
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024); | ||
}; | ||
|
||
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $BOUNDED_ORDER_PRESERVING_VARIANTS: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => { | ||
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => { | ||
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); | ||
|
||
let mut config = ConfigOptions::new(); | ||
config.execution.target_partitions = $TARGET_PARTITIONS; | ||
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS; | ||
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE; | ||
config.optimizer.prefer_existing_sort = $BOUNDED_ORDER_PRESERVING_VARIANTS; | ||
config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT; | ||
|
||
// NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade | ||
// because they were written prior to the separation of `BasicEnforcement` into | ||
|
@@ -3294,7 +3304,7 @@ pub(crate) mod tests { | |
]; | ||
assert_optimized!(expected, exec, true); | ||
// In this case preserving ordering through order preserving operators is not desirable | ||
// (according to flag: bounded_order_preserving_variants) | ||
// (according to flag: PREFER_EXISTING_SORT) | ||
// hence in this case ordering lost during CoalescePartitionsExec and re-introduced with | ||
// SortExec at the top. | ||
let expected = &[ | ||
|
@@ -4341,7 +4351,7 @@ pub(crate) mod tests { | |
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", | ||
]; | ||
|
||
// last flag sets config.optimizer.bounded_order_preserving_variants | ||
// last flag sets config.optimizer.PREFER_EXISTING_SORT | ||
assert_optimized!(expected, physical_plan.clone(), true, true); | ||
assert_optimized!(expected, physical_plan, false, true); | ||
|
||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this config option was renamed to prefer_existing_sort but some of the comments and variable names refer to the old name