Conversation
NGA-TRAN
left a comment
There was a problem hiding this comment.
Very nice. I just want to know if you have verified whether the a child is splittable before splitting
| /// │┌───┴───┐ ┌ ─│ ─ ┌ ─│ ─ ││┌ ─│ ─ ┌───┴───┐ ┌ ─│ ─ ││┌ ─│ ─ ┌ ─│ ─ ┌───┴───┐│ | ||
| /// ││Child 1│ Child 2│ Child 3│││ Child 1│ │Child 2│ Child 3│││ Child 1│ Child 2│ │Child 3││ | ||
| /// │└───────┘ └ ─ ─ └ ─ ─ ││└ ─ ─ └───────┘ └ ─ ─ ││└ ─ ─ └ ─ ─ └───────┘│ | ||
| /// └─────────────────────────────┘└─────────────────────────────┘└─────────────────────────────┘ |
| /// │┌───┴───┐ ┌───┴───┐ ┌ ─│ ─ ││┌ ─│ ─ ┌ ─ ┴ ─ ┐ ┌───┴───┐│ | ||
| /// ││Child 1│ │Child 2│ Child 3│││ Child 1│ Child 2 │Child 3││ | ||
| /// │└───────┘ └───────┘ └ ─ ─ ││└ ─ ─ └ ─ ─ ─ ┘ └───────┘│ | ||
| /// └─────────────────────────────┘└─────────────────────────────┘ |
| /// ││Child 1│ Child 2│ Child 3││││Child 1│ Child 2│ Child 3│││ Child 1│ │Child 2│ Child 3│││ Child 1│ Child 2│ │Child 3││ | ||
| /// ││ (1/2) │ └ ─ ─ └ ─ ─ │││ (2/2) │ └ ─ ─ └ ─ ─ ││└ ─ ─ └───────┘ └ ─ ─ ││└ ─ ─ └ ─ ─ └───────┘│ | ||
| /// │└───────┘ ││└───────┘ ││ ││ │ | ||
| /// └─────────────────────────────┘└─────────────────────────────┘└─────────────────────────────┘└─────────────────────────────┘ |
There was a problem hiding this comment.
This needs more thinking. You can only do this if the child is splittable.
- DataSource and (Sub)Union : are splittable
- Other operators that can handle individual partitions are splittable. Example:
- Partitioned hash join is splittable. CollectLeft is not
- Aggregation with preserved partitioning is splittable
Have you considered them? Maybe in the first round, you only split those you think safe to split first and add more as we go
There was a problem hiding this comment.
This will in fact only be done if "Child 1" is splittable. If the child contains a CollectLeft HashJoin at any point in the subtree, it will not be split.
The doc comment just shows how it works in case it can be split, and in case it makes sense to split it.
| │ FilterExec: Temp9am@0 > 15 | ||
| │ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 | ||
| │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[Temp9am, RainToday], file_type=parquet, predicate=Temp9am@0 > 15, pruning_predicate=Temp9am_null_count@1 != row_count@2 AND Temp9am_max@0 > 15, required_guarantees=[] | ||
| └────────────────────────────────────────────────── |
There was a problem hiding this comment.
I think if you add a line or something between the children for us to know they will go to different tasks, it will be easier to read
There was a problem hiding this comment.
And if you split one of them into many tasks, you may want to add something for us to know that, too
| let extra_tasks = n_tasks % children_count; | ||
|
|
||
| EitherIterator::Right((0..children_count).flat_map(move |child_idx| { | ||
| let task_count = tasks_per_child + if child_idx < extra_tasks { 1 } else { 0 }; |
There was a problem hiding this comment.
Yeah, here we need to verify if the tasks are splittable
There was a problem hiding this comment.
It's not really this piece of code the one that checks if the child can be split. For deciding if a child can be split or not, there is not really any extra mechanism shipped in this PR, the mechanism is the one from Rework distributed planning logic:
- A CollectLeft HashJoin (or any other non-distributable node) gets a task count annotation of
Maximum(1) - Nodes upstream see that, and they are also marked as
Maximum(1) - The union will not try to scale the CollectLeft HashJoin (or any other non-distributable node) further, respecting its original task count annotation, therefore not distributing it.
8316861 to
a683c7f
Compare
1825247 to
d9c33eb
Compare
1dc69fc to
1f01974
Compare
dce6bd8 to
b84bd19
Compare
3444d69 to
67f1ed5
Compare
b84bd19 to
26b3bf5
Compare
67f1ed5 to
09d8017
Compare
d344890 to
46a0781
Compare
cb202e7 to
4b52c6a
Compare
NGA-TRAN
left a comment
There was a problem hiding this comment.
LGTM. Let’s start using it for the obvious use cases first — I think that will naturally lead us to more improvements and better support for the more complex ones.
d17a3ea to
07b998a
Compare
cc9535c to
8aed316
Compare
ca276db to
2b4df44
Compare
edaeb2e to
a515e78
Compare
633c224 to
af20815
Compare
0b9261d to
9203e93
Compare
af20815 to
f9f4439
Compare
7e97a87 to
9352b6b
Compare
# Conflicts: # benchmarks/README.md # benchmarks/src/main.rs # benchmarks/src/run.rs # examples/localhost_run.rs # src/networking/channel_resolver.rs
This PR builds on top of #259 and uses its tools for distributing UNION operations.
The way unions are distributed is with a new
ChildrenIsolatorUnionExecoperator, that will selectively execute its children depending on the distributed task index in which its running.The best way of imagining how this works is by looking at the simplest case, when the union operation runs in the same amount of tasks than children it has. In that case, each tasks executes one child respectively:
It can get complicated if there are either more tasks than children, ore more children than tasks.
For example if there are more children than tasks, their execution will be spread as evenly as possible across distributed tasks:
If there are more tasks than children, then some children might not be able to act as if they were running in single-node, and they might need to be executed in a "reduced" distributed context:
Impact of this PR in the TPC-DS benchmarks that use UNION operations (queries 2,5,14,26,36,49,54,71,75,76,77,80):