Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Matt/recursive cte eliminate distribution and coalesce in recursive children #2

Draft
wants to merge 15 commits into
base: matt/recursive-cte
Choose a base branch
from

Conversation

matthewgapp
Copy link
Owner

@matthewgapp matthewgapp commented Sep 21, 2023

This a WIP. It attempts to eliminate unnecessary (and costly) physical plan steps within a recursive query, speeding recursive queries up by ~30x but perhaps more in some cases.

Specifically, it prevents RepartitionExec and CoalesceExec when they'd otherwise be decedents of a RecursiveQueryExec. It does, however, allow them to be applied to parts of the plan that aren't under a RecursiveQueryExec.

The PR makes changes to DistributionContext and introduces the context pattern into the Coalesce physical optimizer to determine if a given plan is within a RecursiveQuery.

Example:

  EXPLAIN WITH RECURSIVE nodes(id, region, account, increase) AS (

    SELECT CAST(id as BIGINT) as id, users.region as region, account, 0 as increase
    FROM users

    UNION ALL

    SELECT id + 1 as id, growth.region as region, account + growth.increase as account, growth.increase as increase
    FROM nodes
    JOIN growth ON nodes.region = growth.region

    WHERE id < 10000
)
SELECT id, account, region FROM nodes
WHERE id > 9950

Before

cargo run -r time: 6.5 seconds

+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                           |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: users.id, users.account, region                                                                                                                    |
|               |   Filter: CAST(users.id AS Int64) > Int64(99500)                                                                                                               |
|               |     Projection: users.id, region, users.account                                                                                                                |
|               |       RecursiveQuery: is_distinct=false                                                                                                                        |
|               |         Projection: users.id, users.region AS region, users.account, Int64(0) AS increase                                                                      |
|               |           TableScan: users projection=[id, account, region]                                                                                                    |
|               |         Projection: CAST(users.id AS Int64) + Int64(1) AS id, growth.region AS region, users.account + growth.increase AS account, growth.increase AS increase |
|               |           Inner Join: nodes.region = growth.region                                                                                                             |
|               |             Filter: CAST(users.id AS Int64) < Int64(100000)                                                                                                    |
|               |               Projection: users.id, nodes.region, users.account                                                                                                |
|               |                 NamedRelation: nodes                                                                                                                           |
|               |             TableScan: growth projection=[increase, region]                                                                                                    |
| physical_plan | ProjectionExec: expr=[id@0 as id, account@2 as account, region@1 as region]                                                                                    |
|               |   CoalesceBatchesExec: target_batch_size=8192                                                                                                                  |
|               |     FilterExec: CAST(id@0 AS Int64) > 99500                                                                                                                    |
|               |       RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1                                                                                    |
|               |         ProjectionExec: expr=[id@0 as id, region@1 as region, account@2 as account]                                                                            |
|               |           RecursiveQueryExec: is_distinct=false                                                                                                                |
|               |             ProjectionExec: expr=[id@0 as id, region@2 as region, account@1 as account, 0 as increase]                                                         |
|               |               MemoryExec: partitions=1, partition_sizes=[1]                                                                                                    |
|               |             CoalescePartitionsExec                                                                                                                             |
|               |               ProjectionExec: expr=[CAST(id@0 AS Int64) + 1 as id, region@4 as region, account@2 + increase@3 as account, increase@3 as increase]              |
|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                    |
|               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(region@1, region@1)]                                                                   |
|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                |
|               |                       RepartitionExec: partitioning=Hash([region@1], 12), input_partitions=12                                                                  |
|               |                         CoalesceBatchesExec: target_batch_size=8192                                                                                            |
|               |                           FilterExec: CAST(id@0 AS Int64) < 100000                                                                                             |
|               |                             RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1                                                              |
|               |                               ProjectionExec: expr=[id@0 as id, region@1 as region, account@2 as account]                                                      |
|               |                                 ContinuanceExec: name=nodes                                                                                                    |
|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                |
|               |                       RepartitionExec: partitioning=Hash([region@1], 12), input_partitions=12                                                                  |
|               |                         RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1                                                                  |
|               |                           MemoryExec: partitions=1, partition_sizes=[1]                                                                                        |
|               |                                                                                                                                                                |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+

After

cargo run -r time: 220ms

+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                            |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: id, users.account, region                                                                                                           |
|               |   Filter: id > Int64(9950)                                                                                                                      |
|               |     Projection: id, region, users.account                                                                                                       |
|               |       RecursiveQuery: is_distinct=false                                                                                                         |
|               |         Projection: CAST(users.id AS Int64) AS id, users.region AS region, users.account, Int64(0) AS increase                                  |
|               |           TableScan: users projection=[id, account, region]                                                                                     |
|               |         Projection: nodes.id + Int64(1) AS id, growth.region AS region, users.account + growth.increase AS account, growth.increase AS increase |
|               |           Inner Join: nodes.region = growth.region                                                                                              |
|               |             Filter: nodes.id < Int64(10000)                                                                                                     |
|               |               Projection: nodes.id, nodes.region, users.account                                                                                 |
|               |                 NamedRelation: nodes                                                                                                            |
|               |             TableScan: growth projection=[increase, region]                                                                                     |
| physical_plan | ProjectionExec: expr=[id@0 as id, account@2 as account, region@1 as region]                                                                     |
|               |   CoalesceBatchesExec: target_batch_size=8192                                                                                                   |
|               |     FilterExec: id@0 > 9950                                                                                                                     |
|               |       RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1                                                                     |
|               |         ProjectionExec: expr=[id@0 as id, region@1 as region, account@2 as account]                                                             |
|               |           RecursiveQueryExec: is_distinct=false                                                                                                 |
|               |             ProjectionExec: expr=[CAST(id@0 AS Int64) as id, region@2 as region, account@1 as account, 0 as increase]                           |
|               |               MemoryExec: partitions=1, partition_sizes=[1]                                                                                     |
|               |             ProjectionExec: expr=[id@0 + 1 as id, region@4 as region, account@2 + increase@3 as account, increase@3 as increase]                |
|               |               HashJoinExec: mode=Partitioned, join_type=Inner, on=[(region@1, region@1)]                                                        |
|               |                 FilterExec: id@0 < 10000                                                                                                        |
|               |                   ProjectionExec: expr=[id@0 as id, region@1 as region, account@2 as account]                                                   |
|               |                     ContinuanceExec: name=nodes                                                                                                 |
|               |                 MemoryExec: partitions=1, partition_sizes=[1]                                                                                   |
|               |                                                                                                                                                 |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------+

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@matthewgapp matthewgapp force-pushed the matt/recursive-cte-eliminate-distribution-and-coalesce-in-recursive-children branch from 81c3439 to b7e59f7 Compare November 18, 2023 22:08
@matthewgapp
Copy link
Owner Author

rebased onto updated (rebased) cte PR. Plan to ready this PR by tomorrow.

@matthewgapp matthewgapp force-pushed the matt/recursive-cte-eliminate-distribution-and-coalesce-in-recursive-children branch from 2461576 to ce3cdcd Compare November 19, 2023 01:12
@matthewgapp matthewgapp force-pushed the matt/recursive-cte branch 2 times, most recently from 7dba5b6 to 219de0c Compare January 9, 2024 06:27
Repository owner deleted a comment from tobarbaro Feb 10, 2024
matthewgapp pushed a commit that referenced this pull request Apr 12, 2024
* refactor `TreeNode::rewrite()`

* use handle_tree_recursion in `Expr`

* use macro for transform recursions

* fix api

* minor fixes

* fix

* don't trust `t.transformed` coming from transformation closures, keep the old way of detecting if changes were made

* rephrase todo comment, always propagate up `t.transformed` from the transformation closure, fix projection pushdown closure

* Fix `TreeNodeRecursion` docs

* extend Skip (Prune) functionality to Jump as it is defined in https://synnada.notion.site/synnada/TreeNode-Design-Proposal-bceac27d18504a2085145550e267c4c1

* fix Jump and add tests

* jump test fixes

* fix clippy

* unify "transform" traversals using macros, fix "visit" traversal jumps, add visit jump tests, ensure consistent naming `f` instead of `op`, `f_down` instead of `pre_visit` and `f_up` instead of `post_visit`

* fix macro rewrite

* minor fixes

* minor fix

* refactor tests

* add transform tests

* add apply, transform_down and transform_up tests

* refactor tests

* test jump on both a and e nodes in both top-down and bottom-up traversals

* better transform/rewrite tests

* minor fix

* simplify tests

* add stop tests, reorganize tests

* fix previous merges and remove leftover file

* Review TreeNode Refactor (#1)

* Minor changes

* Jump doesn't ignore f_up

* update test

* Update rewriter

* LogicalPlan visit update and propagate from children flags

* Update tree_node.rs

* Update map_children's

---------

Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>

* fix

* minor fixes

* fix f_up call when f_down returns jump

* simplify code

* minor fix

* revert unnecessary changes

* fix `DynTreeNode` and `ConcreteTreeNode` `transformed` and `tnr` propagation

* introduce TransformedResult helper

* fix docs

* restore transform as alias to trassform_up

* restore transform as alias to trassform_up 2

* Simplifications and comment improvements (#2)

---------

Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com>
Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant