Skip to content

Rework distributed planning logic#259

Merged
gabotechs merged 11 commits intomainfrom
gabrielmusat/rework-distributed-planning-logic
Dec 24, 2025
Merged

Rework distributed planning logic#259
gabotechs merged 11 commits intomainfrom
gabrielmusat/rework-distributed-planning-logic

Conversation

@gabotechs
Copy link
Copy Markdown
Collaborator

@gabotechs gabotechs commented Dec 18, 2025

This is a full rewrite of the distributed planner that aims for two things:

  • Solving some bugs/improvement points around how tasks are assigned to each stage and how leaf nodes are scaled up for accounting for greater parallelism.
  • Simplifying the code and make it more reliable by rendering invalid states irrepresentable.

How it worked before

This is how the distributed planner worked before:

  1. estimates distributed task counts for leaf nodes based on TaskEstimators, scaling them up in parallelism
  2. places "Pending" network boundaries in the appropriate places
  3. transitions those "Pending" network boundaries to "Ready" assigning them stages
  4. rollback unnecessary network boundaries, or re-assign them a different number of tasks

The order in which things happened is problematic, as it can lead to several non-desirable situations:

  • A TaskEstimator might decide that a leaf node needs to run in 100 tasks, and it will scale it up accordingly. Then for whatever reason, it's decided that a stage containing that leaf node decides to just use 1 task (e.g., it contains a CollectLeft HashJoin). Then we end up with a x100 scaled leaf node running in just 1 task.
  • A TaskEstimator might decide that a leaf node needs to run in 100 tasks, and the plan has 3 stages, so the planner decides to use something like 100 tasks -> 90 tasks -> 80 tasks. Then the "Pending" to "Ready" transition happens, and it's decided that the middle stage is only going to run in 1 task, so we are left with 100 tasks -> 1 task -> 80 tasks. The middle stage is going to heavily bottleneck the query, so there's no point in using 80 tasks for the head stage.
  • Placing network boundaries before knowing the task count of each nodes forces us to represent a network boundary as an enum with two states: "Pending" and "Ready". This means that in a lot of places we need to do runtime checks for the nodes being in the correct state, leading to potential bugs that only happen at runtime.

How it works now

The distribution step is simplified, and just two things happen:

  1. the plan is annotated with the number of tasks that should be assigned for each node, and if a network boundary should be placed or not. At this point, the plan is left untouched, it's just annotated.
 ┌────────────────────┐ task_count: Maximum(1) (because we try to coalesce all partitions into 1)
 │ CoalescePartitions │ network_boundary: Some(Coalesce)
 └──────────▲─────────┘
            │
 ┌──────────┴─────────┐ task_count: Desired(3) (inherited from the child)
 │     Projection     │ network_boundary: None
 └──────────▲─────────┘
            │
 ┌──────────┴─────────┐ task_count: Desired(3) (as this node requires a network boundary below,
 │    Aggregation     │    and the stage below reduces the cardinality of the data because of the
 │       (final)      │    partial aggregation, we can choose a smaller amount of tasks)
 └──────────▲─────────┘ network_boundary: Some(Shuffle) (because the child is a repartition)
            │
 ┌──────────┴─────────┐ task_count: Desired(4) (inherited from the child)
 │    Repartition     │ network_boundary: None
 └──────────▲─────────┘
            │
 ┌──────────┴─────────┐ task_count: Desired(4) (inherited from the child)
 │    Aggregation     │ network_boundary: None
 │     (partial)      │
 └──────────▲─────────┘
            │
 ┌──────────┴─────────┐ task_count: Desired(4) (this was set by a TaskEstimator implementation)
 │   DataSourceExec   │ network_boundary: None
 └────────────────────┘
  1. the annotated plan is recursed, and the network boundaries are placed in the appropriate places.

This allows us to remove the "Pending" and "Ready" states of the network boundaries, as once they are place everything is set in stone, greatly simplifying the code, and reducing the runtime checks for the nodes to be ready.

Results

We now perform a better task assignation and we better scale up leaf nodes. If there's a stage that can only run in one task and is going to bottleneck the query, there's no point in distributing stages above it, as it's just going to introduce latency because of sending data over the wire without benefiting from all the available compute in the machines. We now perform a smarter scale up of the leaf nodes, making sure that they scale up to the appropriate size based on the final task count assign to the stage containing them:

Comparison against main with TPCH SF1 in a local machine

==== Comparison with the previous benchmark from 2025-12-18 09:01:37 UTC ====
os:        macos
arch:      aarch64
cpu cores: 16
threads:   2 -> 2
workers:   8 -> 8
=============================================================================
 Query 1: prev= 247 ms, new=  90 ms, diff=2.74 faster ✅
 Query 2: prev=  75 ms, new=  75 ms, diff=1.00 slower ✖
 Query 3: prev= 105 ms, new= 106 ms, diff=1.01 slower ✖
 Query 4: prev=  95 ms, new=  71 ms, diff=1.34 faster ✅
 Query 5: prev= 188 ms, new= 152 ms, diff=1.24 faster ✅
 Query 6: prev=  76 ms, new=  26 ms, diff=2.92 faster ✅
 Query 7: prev= 213 ms, new= 163 ms, diff=1.31 faster ✅
 Query 8: prev= 154 ms, new= 149 ms, diff=1.03 faster ✔
 Query 9: prev= 205 ms, new= 198 ms, diff=1.04 faster ✔
Query 10: prev= 138 ms, new= 106 ms, diff=1.30 faster ✅
Query 11: prev=  41 ms, new=  41 ms, diff=1.00 slower ✖
Query 12: prev= 130 ms, new=  81 ms, diff=1.60 faster ✅
Query 13: prev= 136 ms, new=  74 ms, diff=1.84 faster ✅
Query 14: prev= 112 ms, new=  48 ms, diff=2.33 faster ✅
Query 15: prev=  78 ms, new=  48 ms, diff=1.62 faster ✅
Query 16: prev=  38 ms, new=  33 ms, diff=1.15 faster ✔
Query 17: prev= 151 ms, new= 154 ms, diff=1.02 slower ✖
Query 18: prev= 227 ms, new= 231 ms, diff=1.02 slower ✖
Query 19: prev= 176 ms, new=  76 ms, diff=2.32 faster ✅
Query 20: prev=  86 ms, new=  56 ms, diff=1.54 faster ✅
Query 21: prev= 238 ms, new= 247 ms, diff=1.04 slower ✖
Query 22: prev=  30 ms, new=  27 ms, diff=1.11 faster ✔

Comparison against main with TPCH SF1 in a remote cluster of 4 EC2 machines

==== Comparison with previous run ====
      q1: prev=1085 ms, new= 946 ms, 1.15x faster ✔
      q2: prev= 769 ms, new= 698 ms, 1.10x faster ✔
      q3: prev=1523 ms, new=1107 ms, 1.38x faster ✅
      q4: prev= 823 ms, new= 570 ms, 1.44x faster ✅
      q5: prev=1042 ms, new= 742 ms, 1.40x faster ✅
      q6: prev= 774 ms, new= 445 ms, 1.74x faster ✅
      q7: prev=1053 ms, new= 913 ms, 1.15x faster ✔
      q8: prev=1240 ms, new=1026 ms, 1.21x faster ✅
      q9: prev=1263 ms, new=1135 ms, 1.11x faster ✔
     q10: prev= 792 ms, new= 591 ms, 1.34x faster ✅
     q11: prev= 671 ms, new= 441 ms, 1.52x faster ✅
     q12: prev= 703 ms, new= 512 ms, 1.37x faster ✅
     q13: prev= 728 ms, new= 440 ms, 1.65x faster ✅
     q14: prev= 759 ms, new= 507 ms, 1.50x faster ✅
     q15: prev= 946 ms, new= 799 ms, 1.18x faster ✔
     q16: prev= 582 ms, new= 376 ms, 1.55x faster ✅
     q17: prev=1039 ms, new=1042 ms, 1.00x slower ✖
     q18: prev= 893 ms, new= 767 ms, 1.16x faster ✔
     q19: prev= 918 ms, new= 546 ms, 1.68x faster ✅
     q20: prev= 870 ms, new= 762 ms, 1.14x faster ✔
     q21: prev=1697 ms, new=1804 ms, 1.06x slower ✖
     q22: prev= 599 ms, new= 456 ms, 1.31x faster ✅

Comparison against main with TPCH SF10 in a remote cluster of 4 EC2 machines

==== Comparison with previous run ====
      q1: prev=6027 ms, new=3073 ms, 1.96x faster ✅
      q2: prev=1281 ms, new=1416 ms, 1.11x slower ✖
      q3: prev=5055 ms, new=2631 ms, 1.92x faster ✅
      q4: prev=2688 ms, new=1983 ms, 1.36x faster ✅
      q5: prev=5116 ms, new=3469 ms, 1.47x faster ✅
      q6: prev=2319 ms, new=1464 ms, 1.58x faster ✅
      q7: prev=6143 ms, new=4756 ms, 1.29x faster ✅
      q8: prev=6878 ms, new=4210 ms, 1.63x faster ✅
      q9: prev=8471 ms, new=7049 ms, 1.20x faster ✅
     q10: prev=3789 ms, new=2785 ms, 1.36x faster ✅
     q11: prev=1358 ms, new=1290 ms, 1.05x faster ✔
     q12: prev=2580 ms, new=1621 ms, 1.59x faster ✅
     q13: prev=3596 ms, new=2064 ms, 1.74x faster ✅
     q14: prev=2672 ms, new=1401 ms, 1.91x faster ✅
     q15: prev=3500 ms, new=2142 ms, 1.63x faster ✅
     q16: prev=1225 ms, new= 744 ms, 1.65x faster ✅
     q17: prev=8148 ms, new=4720 ms, 1.73x faster ✅
     q18: prev=6702 ms, new=4178 ms, 1.60x faster ✅
     q19: prev=3543 ms, new=2014 ms, 1.76x faster ✅
     q20: prev=3220 ms, new=2091 ms, 1.54x faster ✅
     q21: prev=9323 ms, new=7813 ms, 1.19x faster ✔
     q22: prev=1058 ms, new= 735 ms, 1.44x faster ✅

Comparison against Trino with TPCH SF10 in a remote cluster of 4 EC2 machines

==== Comparison with previous run ====
      q1: prev=3131 ms, new=2816 ms, 1.11x faster ✔
      q2: prev=4307 ms, new=1171 ms, 3.68x faster ✅
      q3: prev=4622 ms, new=2174 ms, 2.13x faster ✅
      q4: prev=2883 ms, new=1746 ms, 1.65x faster ✅
      q5: prev=7041 ms, new=3146 ms, 2.24x faster ✅
      q6: prev=1674 ms, new=1398 ms, 1.20x faster ✔
      q7: prev=8970 ms, new=4322 ms, 2.08x faster ✅
      q8: prev=8906 ms, new=4401 ms, 2.02x faster ✅
      q9: prev=10395 ms, new=5269 ms, 1.97x faster ✅
     q10: prev=3708 ms, new=2445 ms, 1.52x faster ✅
     q11: prev=2409 ms, new=1244 ms, 1.94x faster ✅
     q12: prev=2131 ms, new=1464 ms, 1.46x faster ✅
     q13: prev=3986 ms, new=1639 ms, 2.43x faster ✅
     q14: prev=1828 ms, new=1303 ms, 1.40x faster ✅
     q15: prev=4527 ms, new=1990 ms, 2.27x faster ✅
     q16: prev=1863 ms, new= 665 ms, 2.80x faster ✅
     q17: prev=6684 ms, new=4126 ms, 1.62x faster ✅
     q18: prev=8949 ms, new=4279 ms, 2.09x faster ✅
     q19: prev=1970 ms, new=1893 ms, 1.04x faster ✔
     q20: prev=3793 ms, new=1850 ms, 2.05x faster ✅
     q21: prev=14060 ms, new=7541 ms, 1.86x faster ✅
     q22: prev=1507 ms, new= 648 ms, 2.33x faster ✅

@gabotechs gabotechs changed the base branch from main to gabrielmusat/add-union-wrong-result-test December 18, 2025 08:28
@gabotechs gabotechs force-pushed the gabrielmusat/rework-distributed-planning-logic branch from fb3cef7 to 695c3ac Compare December 18, 2025 08:29
Comment on lines -242 to -257
#[tokio::test]
async fn test_file_scan_config_task_estimator_max_workers() -> Result<(), DataFusionError> {
let mut combined = CombinedTaskEstimator::default();
combined.push(|_: &Arc<dyn ExecutionPlan>, _: &ConfigOptions| None);

let node = make_data_source_exec().await?;
assert_eq!(
combined.task_count(node, |mut cfg| {
cfg.__private_channel_resolver =
ChannelResolverExtension(Arc::new(InMemoryChannelResolver::new(2)));
cfg
}),
2
);
Ok(())
}
Copy link
Copy Markdown
Collaborator Author

@gabotechs gabotechs Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Limiting by max_workers is now done automatically in the distributed planner. No need for tasks estimators for handle it themselves

@gabotechs gabotechs force-pushed the gabrielmusat/rework-distributed-planning-logic branch 2 times, most recently from 9a8e5cb to 8c38e5c Compare December 18, 2025 11:45
@gabotechs gabotechs force-pushed the gabrielmusat/rework-distributed-planning-logic branch 3 times, most recently from e890fa5 to 9d451dd Compare December 18, 2025 15:58
@gabotechs gabotechs marked this pull request as draft December 18, 2025 17:27
Base automatically changed from gabrielmusat/add-union-wrong-result-test to main December 18, 2025 17:54
@gabotechs gabotechs force-pushed the gabrielmusat/rework-distributed-planning-logic branch from 9d451dd to 27de8f5 Compare December 18, 2025 17:55
@gabotechs gabotechs marked this pull request as ready for review December 19, 2025 08:54
@gabotechs gabotechs force-pushed the gabrielmusat/rework-distributed-planning-logic branch from 91885af to 7561af4 Compare December 19, 2025 09:09
Copy link
Copy Markdown
Collaborator

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing comments. Still need to look at the updated tpch plan tests

Comment thread src/distributed_planner/plan_annotator.rs
Comment thread src/distributed_planner/plan_annotator.rs
Comment thread src/distributed_planner/plan_annotator.rs
Comment thread src/distributed_planner/plan_annotator.rs Outdated
Comment thread src/distributed_planner/distributed_physical_optimizer_rule.rs
Comment thread src/metrics/task_metrics_rewriter.rs
Copy link
Copy Markdown
Collaborator

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests clearly tell me you are doing the right things. Great work @gabotechs

I assume you change the number of file groups in the tests on purpose.

Comment thread src/distributed_planner/distributed_physical_optimizer_rule.rs
Comment thread src/distributed_planner/plan_annotator.rs
│ PartitionIsolatorExec: t0:[p0,p1,__,__,__,__] t1:[__,__,p0,p1,__,__] t2:[__,__,__,__,p0,__] t3:[__,__,__,__,__,p0] , metrics=[]
│ DataSourceExec: file_groups={6 groups: [[/testdata/tpch/explain_analyze_sf0.1/lineitem/1.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/10.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/11.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/11.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/12.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/13.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/14.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/14.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/15.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/16.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/16.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/2.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/3.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/4.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/4.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/5.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/6.parquet:<int>..<int>, /testdata/tpch/explain_analyze_sf0.1/lineitem/7.parquet:<int>..<int>], ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], file_type=parquet, predicate=l_shipdate@6 <= 1998-09-02, pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_min@0 <= 1998-09-02, required_guarantees=[], metrics=[output_rows=600572, elapsed_compute=<metric>, batches_split=<metric>, bytes_scanned=<metric>, file_open_errors=<metric>, file_scan_errors=<metric>, num_predicate_creation_errors=<metric>, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=<metric>, pushdown_rows_matched=<metric>, pushdown_rows_pruned=<metric>, bloom_filter_eval_time=<metric>, metadata_load_time=<metric>, page_index_eval_time=<metric>, row_pushdown_eval_time=<metric>, statistics_eval_time=<metric>, time_elapsed_opening=<metric>, time_elapsed_processing=<metric>, time_elapsed_scanning_total=<metric>, time_elapsed_scanning_until_data=<metric>]
│ PartitionIsolatorExec: t0:[p0,p1,p2,p3,p4,p5,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__] t1:[__,__,__,__,__,__,p0,p1,p2,p3,p4,__,__,__,__,__,__,__,__,__,__] t2:[__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4,__,__,__,__,__] t3:[__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,__,p0,p1,p2,p3,p4] , metrics=[]
│ DataSourceExec: file_groups={21 groups: [[/testdata/tpch/explain_analyze_sf0.1/lineitem/1.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/10.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/11.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/11.parquet:<int>..<int>], [/testdata/tpch/explain_analyze_sf0.1/lineitem/12.parquet:<int>..<int>], ...]}, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], file_type=parquet, predicate=l_shipdate@6 <= 1998-09-02, pruning_predicate=l_shipdate_null_count@1 != row_count@2 AND l_shipdate_min@0 <= 1998-09-02, required_guarantees=[], metrics=[output_rows=600572, elapsed_compute=<metric>, batches_split=<metric>, bytes_scanned=<metric>, file_open_errors=<metric>, file_scan_errors=<metric>, num_predicate_creation_errors=<metric>, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=<metric>, pushdown_rows_matched=<metric>, pushdown_rows_pruned=<metric>, bloom_filter_eval_time=<metric>, metadata_load_time=<metric>, page_index_eval_time=<metric>, row_pushdown_eval_time=<metric>, statistics_eval_time=<metric>, time_elapsed_opening=<metric>, time_elapsed_processing=<metric>, time_elapsed_scanning_total=<metric>, time_elapsed_scanning_until_data=<metric>]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

21 groups on purpose? Don't see any configuration/setting changes

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question. The output rows on the aggregate exec went up to 64....

Our correctness tests didn't fail though 🤔

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just debugged this query specifically, and the reason why 21 files are chosen is:

  • originally, the plan contained 6 file groups, with the following distribution of partitioned files: [3,4,3,4,4,3]
  • now, we attempt to split it as much as possible for distributing work, and we end up with 21 file groups, with 1 file each

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the number of output_row going from 24 to 64, I'm not 100% sure why that happen, but should not make it incorrect. The partial aggregation now outputs more rows, maybe because it's partially aggregating more smaller batches?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I think that's fine. It makes senes for the partial aggregation to output more rows since there's more partitions.

Comment thread tests/tpch_explain_analyze.rs
@gabotechs
Copy link
Copy Markdown
Collaborator Author

Just merged #263 into this one. It mainly just adds some unit tests to the plan annotator logic.

Comment thread src/distributed_planner/plan_annotator.rs Outdated
Comment thread src/distributed_planner/task_estimator.rs
Comment thread src/distributed_planner/plan_annotator.rs
Comment thread src/distributed_planner/plan_annotator.rs
Copy link
Copy Markdown
Collaborator

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. The annotation tests you added are great. Made it easier to review for sure.

@gabotechs gabotechs merged commit 7636da1 into main Dec 24, 2025
6 checks passed
@gabotechs gabotechs deleted the gabrielmusat/rework-distributed-planning-logic branch December 24, 2025 09:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants