Skip to content

Commit f792cfa

Browse files
committed
test: reproducer of SanityCheck failure after EnforceSorting removes the coalesce added in the EnforceDistribution
1 parent 90280ea commit f792cfa

File tree

3 files changed

+100
-2
lines changed

3 files changed

+100
-2
lines changed

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1584,7 +1584,7 @@ pub(crate) mod tests {
15841584
.build_arc()
15851585
}
15861586

1587-
fn parquet_exec_with_stats() -> Arc<ParquetExec> {
1587+
pub(crate) fn parquet_exec_with_stats() -> Arc<ParquetExec> {
15881588
let mut statistics = Statistics::new_unknown(&schema());
15891589
statistics.num_rows = Precision::Inexact(10);
15901590
statistics.column_statistics = column_stats();
@@ -1665,7 +1665,7 @@ pub(crate) mod tests {
16651665
)
16661666
}
16671667

1668-
fn projection_exec_with_alias(
1668+
pub(crate) fn projection_exec_with_alias(
16691669
input: Arc<dyn ExecutionPlan>,
16701670
alias_pairs: Vec<(String, String)>,
16711671
) -> Arc<dyn ExecutionPlan> {

datafusion/core/src/physical_optimizer/enforce_sorting.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,11 @@ fn get_sort_exprs(
657657
#[cfg(test)]
658658
mod tests {
659659
use super::*;
660+
use crate::physical_optimizer::enforce_distribution::tests::{
661+
parquet_exec_with_stats, projection_exec_with_alias, schema,
662+
};
660663
use crate::physical_optimizer::enforce_distribution::EnforceDistribution;
664+
use crate::physical_optimizer::sanity_checker::SanityCheckPlan;
661665
use crate::physical_optimizer::test_utils::{
662666
aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec,
663667
coalesce_partitions_exec, filter_exec, global_limit_exec, hash_join_exec,
@@ -675,7 +679,11 @@ mod tests {
675679
use datafusion_common::Result;
676680
use datafusion_expr::JoinType;
677681
use datafusion_physical_expr::expressions::{col, Column, NotExpr};
682+
use datafusion_physical_expr::PhysicalSortExpr;
678683
use datafusion_physical_optimizer::PhysicalOptimizerRule;
684+
use datafusion_physical_plan::aggregates::{
685+
AggregateExec, AggregateMode, PhysicalGroupBy,
686+
};
679687
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
680688

681689
use rstest::rstest;
@@ -2183,6 +2191,95 @@ mod tests {
21832191
Ok(())
21842192
}
21852193

2194+
fn single_partition_aggregate(
2195+
input: Arc<dyn ExecutionPlan>,
2196+
alias_pairs: Vec<(String, String)>,
2197+
) -> Arc<dyn ExecutionPlan> {
2198+
let schema = schema();
2199+
let group_by = alias_pairs
2200+
.iter()
2201+
.map(|(column, alias)| {
2202+
(col(column, &input.schema()).unwrap(), alias.to_string())
2203+
})
2204+
.collect::<Vec<_>>();
2205+
let group_by = PhysicalGroupBy::new_single(group_by);
2206+
2207+
Arc::new(
2208+
AggregateExec::try_new(
2209+
AggregateMode::SinglePartitioned,
2210+
group_by,
2211+
vec![],
2212+
vec![],
2213+
input,
2214+
schema,
2215+
)
2216+
.unwrap(),
2217+
)
2218+
}
2219+
2220+
#[tokio::test]
2221+
async fn test_preserve_needed_coalesce() -> Result<()> {
2222+
// Input to EnforceSorting, from our test case.
2223+
let plan = projection_exec_with_alias(
2224+
union_exec(vec![parquet_exec_with_stats(); 2]),
2225+
vec![
2226+
("a".to_string(), "a".to_string()),
2227+
("b".to_string(), "value".to_string()),
2228+
],
2229+
);
2230+
let plan = Arc::new(CoalescePartitionsExec::new(plan));
2231+
let schema = schema();
2232+
let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
2233+
expr: col("a", &schema).unwrap(),
2234+
options: SortOptions::default(),
2235+
}]);
2236+
let plan: Arc<dyn ExecutionPlan> =
2237+
single_partition_aggregate(plan, vec![("a".to_string(), "a1".to_string())]);
2238+
let plan = sort_exec(sort_key, plan);
2239+
2240+
// Starting plan: as in our test case.
2241+
assert_eq!(
2242+
get_plan_string(&plan),
2243+
vec![
2244+
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
2245+
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
2246+
" CoalescePartitionsExec",
2247+
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2248+
" UnionExec",
2249+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
2250+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
2251+
],
2252+
);
2253+
2254+
let checker = SanityCheckPlan::new().optimize(plan.clone(), &Default::default());
2255+
assert!(checker.is_ok());
2256+
2257+
// EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate).
2258+
let optimizer = EnforceSorting::new();
2259+
let optimized = optimizer.optimize(plan, &Default::default())?;
2260+
assert_eq!(
2261+
get_plan_string(&optimized),
2262+
vec![
2263+
"SortPreservingMergeExec: [a@0 ASC]",
2264+
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
2265+
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
2266+
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2267+
" UnionExec",
2268+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
2269+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
2270+
],
2271+
);
2272+
2273+
// Plan is now invalid.
2274+
let checker = SanityCheckPlan::new();
2275+
let err = checker
2276+
.optimize(optimized, &Default::default())
2277+
.unwrap_err();
2278+
assert!(err.message().contains(" does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2)"));
2279+
2280+
Ok(())
2281+
}
2282+
21862283
#[tokio::test]
21872284
async fn test_coalesce_propagate() -> Result<()> {
21882285
let schema = create_test_schema()?;

datafusion/core/src/physical_optimizer/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
2929

3030
use datafusion_physical_expr::LexRequirement;
3131
use datafusion_physical_expr_common::sort_expr::LexOrdering;
32+
use datafusion_physical_plan::aggregates::AggregateExec;
3233
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
3334
use datafusion_physical_plan::tree_node::PlanContext;
3435

0 commit comments

Comments
 (0)