-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Describe the bug
When a subquery (with an inner join) is used on a join, the dynamic filter generated by the external join can be incorrectly pushed to both tables of the nested one, if the column names match, which can cause invalid results.
To Reproduce
copy (select i as k, i as v from generate_series(1, 1000000) t(i)) to 't1.parquet';
copy (select i + 100000 as k, i as v from generate_series(1, 100000) t(i)) to 't2.parquet';
create external table t1 stored as parquet location 't1.parquet';
create external table t2 stored as parquet location 't2.parquet';
select *
from (
select t2.k as k, t1.k as k2
from t1
join t2 on t1.v = t2.v
) a
join t2 b on a.k = b.k
where b.v < 10000;
ProjectionExec: ...
HashJoinExec: ...
...
ProjectionExec: ...
HashJoinExec: ...
DataSourceExec t2: ... predicate=DynamicFilter [ k@0 >= 100001 AND k@0 <= 109999 AND hash_lookup ]
DataSourceExec t1: ... DynamicFilter [ ... ] AND DynamicFilter [ k@0 >= 100001 AND k@0 <= 109999 AND hash_lookup ]In this case, the dynamic filter on column k (DynamicFilter [ k@0 >= 100001 AND k@0 <= 109999 AND hash_lookup ]) is pushed to both t1 and t2, even though it should only be applied to the k used in the join (t2).
As such, it returns no rows:
+---+----+---+---+
| k | k2 | k | v |
+---+----+---+---+
+---+----+---+---+
If k of t1 is not projected, the query results the correct result.
Expected behavior
If we disable the dynamic filters, the query works as expected:
+--------+-----+--------+-----+
| k | k2 | k | v |
+--------+-----+--------+-----+
| 100032 | 32 | 100032 | 32 |
| 100037 | 37 | 100037 | 37 |
| 100060 | 60 | 100060 | 60 |
| 100072 | 72 | 100072 | 72 |
| 100079 | 79 | 100079 | 79 |
| 100085 | 85 | 100085 | 85 |
...
Additional context
I'm not sure if this happens with just inner joins. I tested with a union as the subquery with switched columns and it correctly identified the right ones to push to:
select *
from (
select k, v
from t2
union all
select v, k
from t2
) a
join t2 b on a.k = b.k
where b.v < 10000;
-- one filters on k while the other filters on v, which is correct
...
UnionExec
DataSourceExec: ... predicate=DynamicFilter [ k@0 >= 100001 AND k@0 <= 109999 AND hash_lookup ],
DataSourceExec: ... predicate=DynamicFilter [ v@1 >= 100001 AND v@1 <= 109999 AND hash_lookup ]