Skip to content

Commit c60b798

Browse files
Fix: Remove Unrelated Fields When Expanding Wildcards in Functional Dependency Projections (#12060)
* Fix exprlist bug * Update datafusion/expr/src/utils.rs Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com> * Update utils.rs * Update joins.slt --------- Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
1 parent 02eab80 commit c60b798

File tree

2 files changed

+85
-2
lines changed

2 files changed

+85
-2
lines changed

datafusion/expr/src/utils.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -838,16 +838,38 @@ pub fn exprlist_len(
838838
qualifier: Some(qualifier),
839839
options,
840840
} => {
841+
let related_wildcard_schema = wildcard_schema.as_ref().map_or_else(
842+
|| Ok(Arc::clone(schema)),
843+
|schema| {
844+
// Eliminate the fields coming from other tables.
845+
let qualified_fields = schema
846+
.fields()
847+
.iter()
848+
.enumerate()
849+
.filter_map(|(idx, field)| {
850+
let (maybe_table_ref, _) = schema.qualified_field(idx);
851+
if maybe_table_ref.map_or(true, |q| q == qualifier) {
852+
Some((maybe_table_ref.cloned(), Arc::clone(field)))
853+
} else {
854+
None
855+
}
856+
})
857+
.collect::<Vec<_>>();
858+
let metadata = schema.metadata().clone();
859+
DFSchema::new_with_metadata(qualified_fields, metadata)
860+
.map(Arc::new)
861+
},
862+
)?;
841863
let excluded = get_excluded_columns(
842864
options.exclude.as_ref(),
843865
options.except.as_ref(),
844-
wildcard_schema.unwrap_or(schema),
866+
related_wildcard_schema.as_ref(),
845867
Some(qualifier),
846868
)?
847869
.into_iter()
848870
.collect::<HashSet<Column>>();
849871
Ok(
850-
get_exprs_except_skipped(wildcard_schema.unwrap_or(schema), excluded)
872+
get_exprs_except_skipped(related_wildcard_schema.as_ref(), excluded)
851873
.len(),
852874
)
853875
}

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4044,3 +4044,64 @@ physical_plan
40444044
03)----MemoryExec: partitions=1, partition_sizes=[1]
40454045
04)----SortExec: TopK(fetch=10), expr=[b@1 ASC NULLS LAST], preserve_partitioning=[false]
40464046
05)------MemoryExec: partitions=1, partition_sizes=[1]
4047+
4048+
4049+
# Functional dependencies across a join
4050+
statement ok
4051+
CREATE TABLE sales_global (
4052+
ts TIMESTAMP,
4053+
sn INTEGER,
4054+
amount INTEGER,
4055+
currency VARCHAR NOT NULL,
4056+
primary key(sn)
4057+
);
4058+
4059+
statement ok
4060+
CREATE TABLE exchange_rates (
4061+
ts TIMESTAMP,
4062+
sn INTEGER,
4063+
currency_from VARCHAR NOT NULL,
4064+
currency_to VARCHAR NOT NULL,
4065+
rate FLOAT,
4066+
primary key(sn)
4067+
);
4068+
4069+
query TT
4070+
EXPLAIN SELECT s.*, s.amount * LAST_VALUE(e.rate) AS amount_usd
4071+
FROM sales_global AS s
4072+
JOIN exchange_rates AS e
4073+
ON s.currency = e.currency_from AND
4074+
e.currency_to = 'USD' AND
4075+
s.ts >= e.ts
4076+
GROUP BY s.sn
4077+
ORDER BY s.sn
4078+
----
4079+
logical_plan
4080+
01)Sort: s.sn ASC NULLS LAST
4081+
02)--Projection: s.ts, s.sn, s.amount, s.currency, CAST(s.amount AS Float32) * last_value(e.rate) AS amount_usd
4082+
03)----Aggregate: groupBy=[[s.sn, s.ts, s.amount, s.currency]], aggr=[[last_value(e.rate)]]
4083+
04)------Projection: s.ts, s.sn, s.amount, s.currency, e.rate
4084+
05)--------Inner Join: s.currency = e.currency_from Filter: s.ts >= e.ts
4085+
06)----------SubqueryAlias: s
4086+
07)------------TableScan: sales_global projection=[ts, sn, amount, currency]
4087+
08)----------SubqueryAlias: e
4088+
09)------------Projection: exchange_rates.ts, exchange_rates.currency_from, exchange_rates.rate
4089+
10)--------------Filter: exchange_rates.currency_to = Utf8("USD")
4090+
11)----------------TableScan: exchange_rates projection=[ts, currency_from, currency_to, rate]
4091+
physical_plan
4092+
01)SortExec: expr=[sn@1 ASC NULLS LAST], preserve_partitioning=[false]
4093+
02)--ProjectionExec: expr=[ts@1 as ts, sn@0 as sn, amount@2 as amount, currency@3 as currency, CAST(amount@2 AS Float32) * last_value(e.rate)@4 as amount_usd]
4094+
03)----AggregateExec: mode=Single, gby=[sn@1 as sn, ts@0 as ts, amount@2 as amount, currency@3 as currency], aggr=[last_value(e.rate)]
4095+
04)------CoalesceBatchesExec: target_batch_size=3
4096+
05)--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@3, currency_from@1)], filter=ts@0 >= ts@1, projection=[ts@0, sn@1, amount@2, currency@3, rate@6]
4097+
06)----------MemoryExec: partitions=1, partition_sizes=[0]
4098+
07)----------ProjectionExec: expr=[ts@0 as ts, currency_from@1 as currency_from, rate@3 as rate]
4099+
08)------------CoalesceBatchesExec: target_batch_size=3
4100+
09)--------------FilterExec: currency_to@2 = USD
4101+
10)----------------MemoryExec: partitions=1, partition_sizes=[0]
4102+
4103+
statement ok
4104+
DROP TABLE sales_global;
4105+
4106+
statement ok
4107+
DROP TABLE exchange_rates;

0 commit comments

Comments
 (0)