Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion datafusion/optimizer/src/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ fn push_down_join(mut join: Join, limit: usize) -> Transformed<Join> {
match join.join_type {
Left => (Some(limit), None),
Right => (None, Some(limit)),
Full => (Some(limit), Some(limit)),
_ => (None, None),
}
};
Expand Down
159 changes: 147 additions & 12 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4240,10 +4240,8 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 LIMIT 2;
logical_plan
01)Limit: skip=0, fetch=2
02)--Full Join: t0.c1 = t1.c1
03)----Limit: skip=0, fetch=2
04)------TableScan: t0 projection=[c1, c2], fetch=2
05)----Limit: skip=0, fetch=2
06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
03)----TableScan: t0 projection=[c1, c2]
04)----TableScan: t1 projection=[c1, c2, c3]
physical_plan
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)]
Expand All @@ -4257,10 +4255,8 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c2 >= t1.c2 LIMIT 2;
logical_plan
01)Limit: skip=0, fetch=2
02)--Full Join: Filter: t0.c2 >= t1.c2
03)----Limit: skip=0, fetch=2
04)------TableScan: t0 projection=[c1, c2], fetch=2
05)----Limit: skip=0, fetch=2
06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
03)----TableScan: t0 projection=[c1, c2]
04)----TableScan: t1 projection=[c1, c2, c3]
physical_plan
01)GlobalLimitExec: skip=0, fetch=2
02)--NestedLoopJoinExec: join_type=Full, filter=c2@0 >= c2@1
Expand All @@ -4274,16 +4270,155 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 AND t0.c2 >= t1.c2 LIMIT
logical_plan
01)Limit: skip=0, fetch=2
02)--Full Join: t0.c1 = t1.c1 Filter: t0.c2 >= t1.c2
03)----Limit: skip=0, fetch=2
04)------TableScan: t0 projection=[c1, c2], fetch=2
05)----Limit: skip=0, fetch=2
06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
03)----TableScan: t0 projection=[c1, c2]
04)----TableScan: t1 projection=[c1, c2, c3]
physical_plan
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], filter=c2@0 >= c2@1
03)----MemoryExec: partitions=1, partition_sizes=[1]
04)----MemoryExec: partitions=1, partition_sizes=[1]

## Add more test cases for join limit pushdown
statement ok
drop table t1

## Test limit pushdown through OUTER JOIN including left/right and full outer join cases
statement ok
set datafusion.execution.target_partitions = 1;

### Limit pushdown through join

# Note we use csv as MemoryExec does not support limit push down (so doesn't manifest
# bugs if limits are improperly pushed down)
query I
COPY (values (1), (2), (3), (4), (5)) TO 'test_files/scratch/limit/t1.csv'
STORED AS CSV
----
5

# store t2 in different order so the top N rows are not the same as the top N rows of t1
query I
COPY (values (5), (4), (3), (2), (1)) TO 'test_files/scratch/limit/t2.csv'
STORED AS CSV
----
5

statement ok
create external table t1(a int) stored as CSV location 'test_files/scratch/limit/t1.csv';

statement ok
create external table t2(b int) stored as CSV location 'test_files/scratch/limit/t2.csv';

######
## LEFT JOIN w/ LIMIT
######
query II
select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
----
2 2
1 1

# the output of this query should be two rows from the previous query
# there should be no nulls
query II
select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
----
2 2
1 1

# can only push down to t1 (preserved side)
query TT
explain select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
----
logical_plan
01)Limit: skip=0, fetch=2
02)--Left Join: t1.a = t2.b
03)----Limit: skip=0, fetch=2
04)------TableScan: t1 projection=[a], fetch=2
05)----TableScan: t2 projection=[b]
physical_plan
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, b@0)]
03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], limit=2, has_header=true
04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], has_header=true

######
## RIGHT JOIN w/ LIMIT
######

query II
select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
----
5 5
4 4

# the output of this query should be two rows from the previous query
# there should be no nulls
query II
select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
----
5 5
4 4

# can only push down to t2 (preserved side)
query TT
explain select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
----
logical_plan
01)Limit: skip=0, fetch=2
02)--Right Join: t1.a = t2.b
03)----TableScan: t1 projection=[a]
04)----Limit: skip=0, fetch=2
05)------TableScan: t2 projection=[b], fetch=2
physical_plan
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, b@0)]
03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], has_header=true
04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], limit=2, has_header=true

######
## FULL JOIN w/ LIMIT
######
query II rowsort
select * from t1 FULL JOIN t2 ON t1.a = t2.b;
----
1 1
2 2
3 3
4 4
5 5

# the output of this query should be two rows from the previous query
# there should be no nulls
# Reproducer for https://github.com/apache/datafusion/issues/14335
query II
select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the test will be flaky. (Maybe explain is enough)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree in theory the results from this query is undefined (in the sense that any two rows would be valid)

However, given we are limiting to a single core via

set datafusion.execution.target_partitions = 1;

I think this won't be flaky in practice

Of course I am somewhat biased as I wrote these tests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, make sense, I didn't notice the setting

----
5 5
4 4


# can't push limit for full outer join
query TT
explain select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2;
----
logical_plan
01)Limit: skip=0, fetch=2
02)--Full Join: t1.a = t2.b
03)----TableScan: t1 projection=[a]
04)----TableScan: t2 projection=[b]
physical_plan
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, b@0)]
03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], has_header=true
04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], has_header=true

statement ok
drop table t1;

statement ok
drop table t2;

# Test Utf8View as Join Key
# Issue: https://github.com/apache/datafusion/issues/12468
statement ok
Expand Down
Loading