Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support limit pushdown through left right outer join #2580

Closed
wants to merge 10 commits into from

Conversation

Ted-Jiang
Copy link
Member

Which issue does this PR close?

run

 explain select * from order left  join item  on  order.o_orderkey = item.l_orderkey  limit 1;

before

| logical_plan  | Limit: 1                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |   Projection: #order.o_orderkey, #order.o_custkey, #order.o_orderstatus, #order.o_totalprice, #order.o_orderdate, #order.o_orderpriority, #order.o_clerk, #order.o_shippriority, #order.o_comment, #item.l_orderkey, #item.l_partkey, #item.l_suppkey, #item.l_linenumber, #item.l_quantity, #item.l_extendedprice, #item.l_discount, #item.l_tax, #item.l_returnflag, #item.l_linestatus, #item.l_shipdate, #item.l_commitdate, #item.l_receiptdate, #item.l_shipinstruct, #item.l_shipmode, #item.l_comment                                                                                                                                                                                                                                                                                                      |
|               |     Left Join: #order.o_orderkey = #item.l_orderkey                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |       TableScan: order projection=Some([0, 1, 2, 3, 4, 5, 6, 7, 8])                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |       TableScan: item projection=Some([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])

after:

| logical_plan  | Limit: 1                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |   Projection: #order.o_orderkey, #order.o_custkey, #order.o_orderstatus, #order.o_totalprice, #order.o_orderdate, #order.o_orderpriority, #order.o_clerk, #order.o_shippriority, #order.o_comment, #item.l_orderkey, #item.l_partkey, #item.l_suppkey, #item.l_linenumber, #item.l_quantity, #item.l_extendedprice, #item.l_discount, #item.l_tax, #item.l_returnflag, #item.l_linestatus, #item.l_shipdate, #item.l_commitdate, #item.l_receiptdate, #item.l_shipinstruct, #item.l_shipmode, #item.l_comment                                                                                                                                                                                                                                                                                                      |
|               |     Left Join: #order.o_orderkey = #item.l_orderkey                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |       TableScan: order projection=Some([0, 1, 2, 3, 4, 5, 6, 7, 8]), limit=1                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |       TableScan: item projection=Some([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])

Closes #2579.

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label May 21, 2022
Ted-Jiang and others added 2 commits May 21, 2022 17:23
apache#2569)

* add scan_empty method to tests

* update tests to use new scan_empty test method

* remove LogicalPlanBuilder::scan_empty

* LogicalPlanBuilder now uses TableSource instead of TableProvider
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the contribution @Ted-Jiang , howe I don't think this is a valid optimization.

Specifically, because a join can filter out rows, if you limit the input you may actually end up with fewer output rows than the limit.

Consider this input:

left:

l
1
2
...
100

right:

r
99
100

The output of select * from left LEFT JOIN right ON (l = r) should be:

l r
99 99
100 100

However, if you push the limit down to the scan on left it would only send this into the JOIN

left:

l
1
2

And thus would produce no output. 

If we want to optimize limits in Joins, I think it would have to be done in the Join Operator itself (to stop producing rows once the limit is hit). However, as long as the Join operator is producing rows in batches, the effect of implementing an internal limit will likely be small (because it would save only a part of one output batch, when the limit on the output of a join is hit)

@Ted-Jiang
Copy link
Member Author

Ted-Jiang commented May 21, 2022

Thank you for the contribution @Ted-Jiang , howe I don't think this is a valid optimization.

Specifically, because a join can filter out rows, if you limit the input you may actually end up with fewer output rows than the limit.

Consider this input:

left:

l
1
2
...
100
right:

r
99
100
The output of select * from left LEFT JOIN right ON (l = r) should be:

l r
99 99
100 100
However, if you push the limit down to the scan on left it would only send this into the JOIN

left:

l
1
2


And thus would produce no output. 

If we want to optimize limits in Joins, I think it would have to be done in the Join Operator itself (to stop producing rows once the limit is hit). However, as long as the Join operator is producing rows in batches, the effect of implementing an internal limit will likely be small (because it would save only a part of one output batch, when the limit on the output of a join is hit)

I think this situation is select * from left LEFT JOIN right ON (l = r) without limit. There will no limit in left table_scan
will still produce

left:

l
1

...
100

But this rule will apply select * from left LEFT JOIN right ON (l = r) limit 2
which left table will get the limit will send two values to join, i think this is right
and the result will be

l	r
1       Null
2	Null

@alamb is this situation make sense

Copy link
Member Author

@Ted-Jiang Ted-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question is df LEFT JOIN is equal to LEFT OUTER JOIN

@alamb
Copy link
Contributor

alamb commented May 21, 2022

One question is df LEFT JOIN is equal to LEFT OUTER JOIN?

Yes

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right @Ted-Jiang -- I was confused about the side of the join 🤦

Upon more thought I think this is correct.

Thank you again.

I'll leave it open for another day or so before merging in case anyone else has thoughts

@Ted-Jiang
Copy link
Member Author

Yes, you are right @Ted-Jiang -- I was confused about the side of the join 🤦

Upon more thought I think this is correct.

Thank you again.

I'll leave it open for another day or so before merging in case anyone else has thoughts

Thanks @alamb ❤️.
I find this rule in spark
https://github.com/apache/spark/blob/efe43306fcab18f076f755c81c0406ebc1a5fee9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L707

Copy link
Member

@jackwener jackwener left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
It can reduce data source in the side of join.
Great job❤️.

Co-authored-by: jakevin <30525741+jackwener@users.noreply.github.com>
@alamb
Copy link
Contributor

alamb commented May 22, 2022

it seems as if there is a clippy error now 😢

@Ted-Jiang Ted-Jiang closed this May 22, 2022
@Ted-Jiang Ted-Jiang reopened this May 22, 2022
@alamb
Copy link
Contributor

alamb commented May 22, 2022

And now there is a conflicts 🤦

# Conflicts:
#	datafusion/core/src/optimizer/limit_push_down.rs
@github-actions github-actions bot added development-process Related to development process of DataFusion documentation Improvements or additions to documentation labels May 23, 2022
@Ted-Jiang Ted-Jiang closed this May 23, 2022
@alamb
Copy link
Contributor

alamb commented May 23, 2022

reopened as part of #2596

@alamb
Copy link
Contributor

alamb commented May 23, 2022

Screen Shot 2022-05-23 at 2 38 36 PM

Strangely github won't let this PR be reopened 🤔

@alamb
Copy link
Contributor

alamb commented May 23, 2022

#2596 is now merged 👏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate development-process Related to development process of DataFusion documentation Improvements or additions to documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Push Limit through outer Join
4 participants