Skip to content

Commit 8d726a7

Browse files
zhuqi-lucasDandandan
authored andcommitted
fix: FULL OUTER JOIN and LIMIT produces wrong results (apache#14338)
* fix: FULL OUTER JOIN and LIMIT produces wrong results * Fix minor slt testing * fix test
1 parent a4d0592 commit 8d726a7

File tree

2 files changed

+264
-13
lines changed

2 files changed

+264
-13
lines changed

datafusion/optimizer/src/push_down_limit.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,6 @@ fn push_down_join(mut join: Join, limit: usize) -> Transformed<Join> {
255255
match join.join_type {
256256
Left => (Some(limit), None),
257257
Right => (None, Some(limit)),
258-
Full => (Some(limit), Some(limit)),
259258
_ => (None, None),
260259
}
261260
};

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 264 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4234,10 +4234,8 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 LIMIT 2;
42344234
logical_plan
42354235
01)Limit: skip=0, fetch=2
42364236
02)--Full Join: t0.c1 = t1.c1
4237-
03)----Limit: skip=0, fetch=2
4238-
04)------TableScan: t0 projection=[c1, c2], fetch=2
4239-
05)----Limit: skip=0, fetch=2
4240-
06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
4237+
03)----TableScan: t0 projection=[c1, c2]
4238+
04)----TableScan: t1 projection=[c1, c2, c3]
42414239
physical_plan
42424240
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
42434241
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)]
@@ -4251,10 +4249,8 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c2 >= t1.c2 LIMIT 2;
42514249
logical_plan
42524250
01)Limit: skip=0, fetch=2
42534251
02)--Full Join: Filter: t0.c2 >= t1.c2
4254-
03)----Limit: skip=0, fetch=2
4255-
04)------TableScan: t0 projection=[c1, c2], fetch=2
4256-
05)----Limit: skip=0, fetch=2
4257-
06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
4252+
03)----TableScan: t0 projection=[c1, c2]
4253+
04)----TableScan: t1 projection=[c1, c2, c3]
42584254
physical_plan
42594255
01)GlobalLimitExec: skip=0, fetch=2
42604256
02)--NestedLoopJoinExec: join_type=Full, filter=c2@0 >= c2@1
@@ -4268,12 +4264,268 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 AND t0.c2 >= t1.c2 LIMIT
42684264
logical_plan
42694265
01)Limit: skip=0, fetch=2
42704266
02)--Full Join: t0.c1 = t1.c1 Filter: t0.c2 >= t1.c2
4271-
03)----Limit: skip=0, fetch=2
4272-
04)------TableScan: t0 projection=[c1, c2], fetch=2
4273-
05)----Limit: skip=0, fetch=2
4274-
06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
4267+
03)----TableScan: t0 projection=[c1, c2]
4268+
04)----TableScan: t1 projection=[c1, c2, c3]
42754269
physical_plan
42764270
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
42774271
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], filter=c2@0 >= c2@1
42784272
03)----MemoryExec: partitions=1, partition_sizes=[1]
42794273
04)----MemoryExec: partitions=1, partition_sizes=[1]
4274+
4275+
## Add more test cases for join limit pushdown
4276+
statement ok
4277+
drop table t1
4278+
4279+
## Test limit pushdown through OUTER JOIN including left/right and full outer join cases
4280+
statement ok
4281+
set datafusion.execution.target_partitions = 1;
4282+
4283+
### Limit pushdown through join
4284+
4285+
# Note we use csv as MemoryExec does not support limit push down (so doesn't manifest
4286+
# bugs if limits are improperly pushed down)
4287+
query I
4288+
COPY (values (1), (2), (3), (4), (5)) TO 'test_files/scratch/limit/t1.csv'
4289+
STORED AS CSV
4290+
----
4291+
5
4292+
4293+
# store t2 in different order so the top N rows are not the same as the top N rows of t1
4294+
query I
4295+
COPY (values (5), (4), (3), (2), (1)) TO 'test_files/scratch/limit/t2.csv'
4296+
STORED AS CSV
4297+
----
4298+
5
4299+
4300+
statement ok
4301+
create external table t1(a int) stored as CSV location 'test_files/scratch/limit/t1.csv';
4302+
4303+
statement ok
4304+
create external table t2(b int) stored as CSV location 'test_files/scratch/limit/t2.csv';
4305+
4306+
######
4307+
## LEFT JOIN w/ LIMIT
4308+
######
4309+
query II
4310+
select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
4311+
----
4312+
2 2
4313+
1 1
4314+
4315+
# the output of this query should be two rows from the previous query
4316+
# there should be no nulls
4317+
query II
4318+
select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
4319+
----
4320+
2 2
4321+
1 1
4322+
4323+
# can only push down to t1 (preserved side)
4324+
query TT
4325+
explain select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
4326+
----
4327+
logical_plan
4328+
01)Limit: skip=0, fetch=2
4329+
02)--Left Join: t1.a = t2.b
4330+
03)----Limit: skip=0, fetch=2
4331+
04)------TableScan: t1 projection=[a], fetch=2
4332+
05)----TableScan: t2 projection=[b]
4333+
physical_plan
4334+
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
4335+
02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, b@0)]
4336+
03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], limit=2, has_header=true
4337+
04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], has_header=true
4338+
4339+
######
4340+
## RIGHT JOIN w/ LIMIT
4341+
######
4342+
4343+
query II
4344+
select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
4345+
----
4346+
5 5
4347+
4 4
4348+
4349+
# the output of this query should be two rows from the previous query
4350+
# there should be no nulls
4351+
query II
4352+
select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
4353+
----
4354+
5 5
4355+
4 4
4356+
4357+
# can only push down to t2 (preserved side)
4358+
query TT
4359+
explain select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
4360+
----
4361+
logical_plan
4362+
01)Limit: skip=0, fetch=2
4363+
02)--Right Join: t1.a = t2.b
4364+
03)----TableScan: t1 projection=[a]
4365+
04)----Limit: skip=0, fetch=2
4366+
05)------TableScan: t2 projection=[b], fetch=2
4367+
physical_plan
4368+
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
4369+
02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, b@0)]
4370+
03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], has_header=true
4371+
04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], limit=2, has_header=true
4372+
4373+
######
4374+
## FULL JOIN w/ LIMIT
4375+
######
4376+
query II rowsort
4377+
select * from t1 FULL JOIN t2 ON t1.a = t2.b;
4378+
----
4379+
1 1
4380+
2 2
4381+
3 3
4382+
4 4
4383+
5 5
4384+
4385+
# the output of this query should be two rows from the previous query
4386+
# there should be no nulls
4387+
# Reproducer for https://github.com/apache/datafusion/issues/14335
4388+
query II
4389+
select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2;
4390+
----
4391+
5 5
4392+
4 4
4393+
4394+
4395+
# can't push limit for full outer join
4396+
query TT
4397+
explain select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2;
4398+
----
4399+
logical_plan
4400+
01)Limit: skip=0, fetch=2
4401+
02)--Full Join: t1.a = t2.b
4402+
03)----TableScan: t1 projection=[a]
4403+
04)----TableScan: t2 projection=[b]
4404+
physical_plan
4405+
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
4406+
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, b@0)]
4407+
03)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, projection=[a], has_header=true
4408+
04)----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, projection=[b], has_header=true
4409+
4410+
statement ok
4411+
drop table t1;
4412+
4413+
statement ok
4414+
drop table t2;
4415+
4416+
# Test Utf8View as Join Key
4417+
# Issue: https://github.com/apache/datafusion/issues/12468
4418+
statement ok
4419+
CREATE TABLE table1(v1 STRING) AS VALUES ('foo'), (NULL);
4420+
4421+
statement ok
4422+
CREATE TABLE table1_stringview AS SELECT arrow_cast(v1, 'Utf8View') AS v1 FROM table1;
4423+
4424+
query T
4425+
select * from table1 as t1 natural join table1_stringview as t2;
4426+
----
4427+
foo
4428+
4429+
query TT
4430+
EXPLAIN SELECT count(*)
4431+
FROM my_catalog.my_schema.table_with_many_types AS l
4432+
JOIN my_catalog.my_schema.table_with_many_types AS r ON l.binary_col = r.binary_col
4433+
----
4434+
logical_plan
4435+
01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
4436+
02)--Projection:
4437+
03)----Inner Join: l.binary_col = r.binary_col
4438+
04)------SubqueryAlias: l
4439+
05)--------TableScan: my_catalog.my_schema.table_with_many_types projection=[binary_col]
4440+
06)------SubqueryAlias: r
4441+
07)--------TableScan: my_catalog.my_schema.table_with_many_types projection=[binary_col]
4442+
physical_plan
4443+
01)AggregateExec: mode=Single, gby=[], aggr=[count(*)]
4444+
02)--ProjectionExec: expr=[]
4445+
03)----CoalesceBatchesExec: target_batch_size=3
4446+
04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(binary_col@0, binary_col@0)]
4447+
05)--------MemoryExec: partitions=1, partition_sizes=[1]
4448+
06)--------MemoryExec: partitions=1, partition_sizes=[1]
4449+
4450+
# Test hash join sort push down
4451+
# Issue: https://github.com/apache/datafusion/issues/13559
4452+
statement ok
4453+
CREATE TABLE test(a INT, b INT, c INT)
4454+
4455+
statement ok
4456+
insert into test values (1,2,3), (4,5,6), (null, 7, 8), (8, null, 9), (9, 10, null)
4457+
4458+
statement ok
4459+
set datafusion.execution.target_partitions = 2;
4460+
4461+
query TT
4462+
explain select * from test where a in (select a from test where b > 3) order by c desc nulls first;
4463+
----
4464+
logical_plan
4465+
01)Sort: test.c DESC NULLS FIRST
4466+
02)--LeftSemi Join: test.a = __correlated_sq_1.a
4467+
03)----TableScan: test projection=[a, b, c]
4468+
04)----SubqueryAlias: __correlated_sq_1
4469+
05)------Projection: test.a
4470+
06)--------Filter: test.b > Int32(3)
4471+
07)----------TableScan: test projection=[a, b]
4472+
physical_plan
4473+
01)SortPreservingMergeExec: [c@2 DESC]
4474+
02)--CoalesceBatchesExec: target_batch_size=3
4475+
03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(a@0, a@0)]
4476+
04)------CoalesceBatchesExec: target_batch_size=3
4477+
05)--------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
4478+
06)----------CoalesceBatchesExec: target_batch_size=3
4479+
07)------------FilterExec: b@1 > 3, projection=[a@0]
4480+
08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
4481+
09)----------------MemoryExec: partitions=1, partition_sizes=[1]
4482+
10)------SortExec: expr=[c@2 DESC], preserve_partitioning=[true]
4483+
11)--------CoalesceBatchesExec: target_batch_size=3
4484+
12)----------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
4485+
13)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
4486+
14)--------------MemoryExec: partitions=1, partition_sizes=[1]
4487+
4488+
query TT
4489+
explain select * from test where a in (select a from test where b > 3) order by c desc nulls last;
4490+
----
4491+
logical_plan
4492+
01)Sort: test.c DESC NULLS LAST
4493+
02)--LeftSemi Join: test.a = __correlated_sq_1.a
4494+
03)----TableScan: test projection=[a, b, c]
4495+
04)----SubqueryAlias: __correlated_sq_1
4496+
05)------Projection: test.a
4497+
06)--------Filter: test.b > Int32(3)
4498+
07)----------TableScan: test projection=[a, b]
4499+
physical_plan
4500+
01)SortPreservingMergeExec: [c@2 DESC NULLS LAST]
4501+
02)--CoalesceBatchesExec: target_batch_size=3
4502+
03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(a@0, a@0)]
4503+
04)------CoalesceBatchesExec: target_batch_size=3
4504+
05)--------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
4505+
06)----------CoalesceBatchesExec: target_batch_size=3
4506+
07)------------FilterExec: b@1 > 3, projection=[a@0]
4507+
08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
4508+
09)----------------MemoryExec: partitions=1, partition_sizes=[1]
4509+
10)------SortExec: expr=[c@2 DESC NULLS LAST], preserve_partitioning=[true]
4510+
11)--------CoalesceBatchesExec: target_batch_size=3
4511+
12)----------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
4512+
13)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
4513+
14)--------------MemoryExec: partitions=1, partition_sizes=[1]
4514+
4515+
query III
4516+
select * from test where a in (select a from test where b > 3) order by c desc nulls first;
4517+
----
4518+
9 10 NULL
4519+
4 5 6
4520+
4521+
query III
4522+
select * from test where a in (select a from test where b > 3) order by c desc nulls last;
4523+
----
4524+
4 5 6
4525+
9 10 NULL
4526+
4527+
statement ok
4528+
DROP TABLE test
4529+
4530+
statement ok
4531+
set datafusion.execution.target_partitions = 1;

0 commit comments

Comments
 (0)