Skip to content

Distribute UNION operations#262

Merged
gabotechs merged 30 commits intomainfrom
gabrielmusat/distribute-unions
Dec 29, 2025
Merged

Distribute UNION operations#262
gabotechs merged 30 commits intomainfrom
gabrielmusat/distribute-unions

Conversation

@gabotechs
Copy link
Copy Markdown
Collaborator

@gabotechs gabotechs commented Dec 19, 2025

This PR builds on top of #259 and uses its tools for distributing UNION operations.

The way unions are distributed is with a new ChildrenIsolatorUnionExec operator, that will selectively execute its children depending on the distributed task index in which its running.

The best way of imagining how this works is by looking at the simplest case, when the union operation runs in the same amount of tasks than children it has. In that case, each tasks executes one child respectively:

┌─────────────────────────────┐┌─────────────────────────────┐┌─────────────────────────────┐
│           Task 1            ││           Task 2            ││           Task 3            │
│┌───────────────────────────┐││┌───────────────────────────┐││┌───────────────────────────┐│
││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││
│└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘│
│    │                        ││              │              ││                        │    │
│┌───┴───┐ ┌  ─│ ─   ┌  ─│ ─  ││┌  ─│ ─   ┌───┴───┐ ┌  ─│ ─  ││┌  ─│ ─   ┌  ─│ ─   ┌───┴───┐│
││Child 1│  Child 2│  Child 3│││ Child 1│ │Child 2│  Child 3│││ Child 1│  Child 2│ │Child 3││
│└───────┘ └  ─  ─   └  ─  ─  ││└  ─  ─   └───────┘ └  ─  ─  ││└  ─  ─   └  ─  ─   └───────┘│
└─────────────────────────────┘└─────────────────────────────┘└─────────────────────────────┘

It can get complicated if there are either more tasks than children, ore more children than tasks.

For example if there are more children than tasks, their execution will be spread as evenly as possible across distributed tasks:

┌─────────────────────────────┐┌─────────────────────────────┐
│           Task 1            ││           Task 2            │
│┌───────────────────────────┐││┌───────────────────────────┐│
││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││
│└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘│
│    │         │              ││                        │    │
│┌───┴───┐ ┌───┴───┐ ┌  ─│ ─  ││┌  ─│ ─   ┌ ─ ┴ ─ ┐ ┌───┴───┐│
││Child 1│ │Child 2│  Child 3│││ Child 1│  Child 2  │Child 3││
│└───────┘ └───────┘ └  ─  ─  ││└  ─  ─   └ ─ ─ ─ ┘ └───────┘│
└─────────────────────────────┘└─────────────────────────────┘

If there are more tasks than children, then some children might not be able to act as if they were running in single-node, and they might need to be executed in a "reduced" distributed context:

┌─────────────────────────────┐┌─────────────────────────────┐┌─────────────────────────────┐┌─────────────────────────────┐
│           Task 1            ││           Task 2            ││           Task 3            ││           Task 4            │
│┌───────────────────────────┐││┌───────────────────────────┐││┌───────────────────────────┐││┌───────────────────────────┐│
││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││
│└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘│
│    │                        ││    │                        ││              │              ││                        │    │
│┌───┴───┐ ┌  ─│ ─   ┌  ─│ ─  ││┌───┴───┐ ┌  ─│ ─   ┌  ─│ ─  ││┌  ─│ ─   ┌───┴───┐ ┌  ─│ ─  ││┌  ─│ ─   ┌  ─│ ─   ┌───┴───┐│
││Child 1│  Child 2│  Child 3││││Child 1│  Child 2│  Child 3│││ Child 1│ │Child 2│  Child 3│││ Child 1│  Child 2│ │Child 3││
││ (1/2) │ └  ─  ─   └  ─  ─  │││ (2/2) │ └  ─  ─   └  ─  ─  ││└  ─  ─   └───────┘ └  ─  ─  ││└  ─  ─   └  ─  ─   └───────┘│
│└───────┘                    ││└───────┘                    ││                             ││                             │
└─────────────────────────────┘└─────────────────────────────┘└─────────────────────────────┘└─────────────────────────────┘

Impact of this PR in the TPC-DS benchmarks that use UNION operations (queries 2,5,14,26,36,49,54,71,75,76,77,80):

$ WORKERS=8 ./benchmarks/run.sh --threads 2 --files-per-task 2 --path benchmarks/data/tpcds_sf1 --query 2,5,14,26,36,49,54,71,75,76,77,80
VS
$ WORKERS=8 ./benchmarks/run.sh --threads 2 --files-per-task 2 --path benchmarks/data/tpcds_sf1 --query 2,5,14,26,36,49,54,71,75,76,77,80 --children-isolator-unions
==== Comparison with the previous benchmark from 2025-12-26 12:58:34 UTC ====
os:        macos
arch:      aarch64
cpu cores: 16
threads:   2 -> 2
workers:   8 -> 8
=============================================================================
 Tpcds 2: prev=  82 ms, new=  70 ms, diff=1.17 faster ✔
 Tpcds 5: prev=  83 ms, new=  77 ms, diff=1.08 faster ✔
Tpcds 14: prev= 563 ms, new= 443 ms, diff=1.27 faster ✅
Tpcds 26: prev=  49 ms, new=  50 ms, diff=1.02 slower ✖
Tpcds 36: prev= 169 ms, new=  98 ms, diff=1.72 faster ✅
Tpcds 49: prev= 111 ms, new=  72 ms, diff=1.54 faster ✅
Tpcds 54: prev= 102 ms, new=  62 ms, diff=1.65 faster ✅
Tpcds 71: prev=  50 ms, new=  42 ms, diff=1.19 faster ✔
Tpcds 75: prev= 265 ms, new= 217 ms, diff=1.22 faster ✅
Tpcds 76: prev=  56 ms, new=  48 ms, diff=1.17 faster ✔
Tpcds 77: prev=  68 ms, new=  63 ms, diff=1.08 faster ✔
Tpcds 80: prev= 173 ms, new= 143 ms, diff=1.21 faster ✅

@gabotechs gabotechs changed the base branch from main to gabrielmusat/rework-distributed-planning-logic December 19, 2025 19:34
Copy link
Copy Markdown
Collaborator

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice. I just want to know if you have verified whether the a child is splittable before splitting

/// │┌───┴───┐ ┌ ─│ ─ ┌ ─│ ─ ││┌ ─│ ─ ┌───┴───┐ ┌ ─│ ─ ││┌ ─│ ─ ┌ ─│ ─ ┌───┴───┐│
/// ││Child 1│ Child 2│ Child 3│││ Child 1│ │Child 2│ Child 3│││ Child 1│ Child 2│ │Child 3││
/// │└───────┘ └ ─ ─ └ ─ ─ ││└ ─ ─ └───────┘ └ ─ ─ ││└ ─ ─ └ ─ ─ └───────┘│
/// └─────────────────────────────┘└─────────────────────────────┘└─────────────────────────────┘
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good

/// │┌───┴───┐ ┌───┴───┐ ┌ ─│ ─ ││┌ ─│ ─ ┌ ─ ┴ ─ ┐ ┌───┴───┐│
/// ││Child 1│ │Child 2│ Child 3│││ Child 1│ Child 2 │Child 3││
/// │└───────┘ └───────┘ └ ─ ─ ││└ ─ ─ └ ─ ─ ─ ┘ └───────┘│
/// └─────────────────────────────┘└─────────────────────────────┘
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good

/// ││Child 1│ Child 2│ Child 3││││Child 1│ Child 2│ Child 3│││ Child 1│ │Child 2│ Child 3│││ Child 1│ Child 2│ │Child 3││
/// ││ (1/2) │ └ ─ ─ └ ─ ─ │││ (2/2) │ └ ─ ─ └ ─ ─ ││└ ─ ─ └───────┘ └ ─ ─ ││└ ─ ─ └ ─ ─ └───────┘│
/// │└───────┘ ││└───────┘ ││ ││ │
/// └─────────────────────────────┘└─────────────────────────────┘└─────────────────────────────┘└─────────────────────────────┘
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs more thinking. You can only do this if the child is splittable.

  • DataSource and (Sub)Union : are splittable
  • Other operators that can handle individual partitions are splittable. Example:
    • Partitioned hash join is splittable. CollectLeft is not
    • Aggregation with preserved partitioning is splittable

Have you considered them? Maybe in the first round, you only split those you think safe to split first and add more as we go

Copy link
Copy Markdown
Collaborator Author

@gabotechs gabotechs Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will in fact only be done if "Child 1" is splittable. If the child contains a CollectLeft HashJoin at any point in the subtree, it will not be split.

The doc comment just shows how it works in case it can be split, and in case it makes sense to split it.

│ FilterExec: Temp9am@0 > 15
│ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[Temp9am, RainToday], file_type=parquet, predicate=Temp9am@0 > 15, pruning_predicate=Temp9am_null_count@1 != row_count@2 AND Temp9am_max@0 > 15, required_guarantees=[]
└──────────────────────────────────────────────────
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you add a line or something between the children for us to know they will go to different tasks, it will be easier to read

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if you split one of them into many tasks, you may want to add something for us to know that, too

let extra_tasks = n_tasks % children_count;

EitherIterator::Right((0..children_count).flat_map(move |child_idx| {
let task_count = tasks_per_child + if child_idx < extra_tasks { 1 } else { 0 };
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, here we need to verify if the tasks are splittable

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really this piece of code the one that checks if the child can be split. For deciding if a child can be split or not, there is not really any extra mechanism shipped in this PR, the mechanism is the one from Rework distributed planning logic:

  • A CollectLeft HashJoin (or any other non-distributable node) gets a task count annotation of Maximum(1)
  • Nodes upstream see that, and they are also marked as Maximum(1)
  • The union will not try to scale the CollectLeft HashJoin (or any other non-distributable node) further, respecting its original task count annotation, therefore not distributing it.

@gabotechs gabotechs changed the base branch from gabrielmusat/rework-distributed-planning-logic to gabrielmusat/add-annotation-tests December 20, 2025 09:12
@gabotechs gabotechs force-pushed the gabrielmusat/distribute-unions branch from 8316861 to a683c7f Compare December 20, 2025 09:12
@gabotechs gabotechs force-pushed the gabrielmusat/add-annotation-tests branch from 1825247 to d9c33eb Compare December 20, 2025 13:08
@gabotechs gabotechs force-pushed the gabrielmusat/distribute-unions branch 3 times, most recently from 1dc69fc to 1f01974 Compare December 22, 2025 07:16
@gabotechs gabotechs changed the base branch from gabrielmusat/add-annotation-tests to gabrielmusat/add-tpcds-plan-tests December 22, 2025 07:16
@gabotechs gabotechs force-pushed the gabrielmusat/distribute-unions branch 5 times, most recently from dce6bd8 to b84bd19 Compare December 22, 2025 11:10
@gabotechs gabotechs force-pushed the gabrielmusat/add-tpcds-plan-tests branch from 3444d69 to 67f1ed5 Compare December 22, 2025 11:15
@gabotechs gabotechs force-pushed the gabrielmusat/distribute-unions branch from b84bd19 to 26b3bf5 Compare December 22, 2025 11:15
@gabotechs gabotechs force-pushed the gabrielmusat/add-tpcds-plan-tests branch from 67f1ed5 to 09d8017 Compare December 22, 2025 11:22
@gabotechs gabotechs force-pushed the gabrielmusat/distribute-unions branch 2 times, most recently from d344890 to 46a0781 Compare December 22, 2025 11:25
@gabotechs gabotechs force-pushed the gabrielmusat/distribute-unions branch from cb202e7 to 4b52c6a Compare December 22, 2025 13:16
Copy link
Copy Markdown
Collaborator

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Let’s start using it for the obvious use cases first — I think that will naturally lead us to more improvements and better support for the more complex ones.

@gabotechs gabotechs force-pushed the gabrielmusat/distribute-unions branch from d17a3ea to 07b998a Compare December 24, 2025 10:32
@gabotechs gabotechs force-pushed the gabrielmusat/add-tpcds-plan-tests branch from cc9535c to 8aed316 Compare December 24, 2025 15:57
@gabotechs gabotechs changed the base branch from gabrielmusat/add-tpcds-plan-tests to gabrielmusat/tpcds-benchmarks December 24, 2025 16:38
@gabotechs gabotechs force-pushed the gabrielmusat/distribute-unions branch from ca276db to 2b4df44 Compare December 24, 2025 16:43
@gabotechs gabotechs force-pushed the gabrielmusat/tpcds-benchmarks branch from edaeb2e to a515e78 Compare December 25, 2025 10:19
@gabotechs gabotechs force-pushed the gabrielmusat/tpcds-benchmarks branch from 633c224 to af20815 Compare December 26, 2025 11:59
@gabotechs gabotechs force-pushed the gabrielmusat/distribute-unions branch from 0b9261d to 9203e93 Compare December 26, 2025 12:00
@gabotechs gabotechs force-pushed the gabrielmusat/tpcds-benchmarks branch from af20815 to f9f4439 Compare December 27, 2025 15:45
@gabotechs gabotechs force-pushed the gabrielmusat/distribute-unions branch from 7e97a87 to 9352b6b Compare December 27, 2025 15:46
@gabotechs gabotechs deleted the branch main December 28, 2025 10:53
@gabotechs gabotechs closed this Dec 28, 2025
@gabotechs gabotechs reopened this Dec 28, 2025
Base automatically changed from gabrielmusat/tpcds-benchmarks to main December 29, 2025 11:43
# Conflicts:
#	benchmarks/README.md
#	benchmarks/src/main.rs
#	benchmarks/src/run.rs
#	examples/localhost_run.rs
#	src/networking/channel_resolver.rs
@gabotechs gabotechs merged commit a94556a into main Dec 29, 2025
7 checks passed
@gabotechs gabotechs deleted the gabrielmusat/distribute-unions branch December 29, 2025 13:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants