Skip to content

Commit aaa94c4

Browse files
mihailotim-dbcloud-fan
authored andcommitted
[SPARK-51544][SQL] Add only unique and necessary metadata columns
### What changes were proposed in this pull request? AddMetadataColumns should add only unique and necessary metadata columns, not the entire child's metadata output ### Why are the changes needed? There are 3 reasons to make this change: 1. Adding duplicates of metadata columns creates problems for single-pass analyzer, where we need to hack our way around adding these columns, because both `AddMetadataColumns` and `ResolveReferences` can add same attributes. 2. Adding unique and only necessary metadata columns is more semantically correct 3. This PR is also a preparation to fix [SPARK-51545 ](https://issues.apache.org/jira/browse/SPARK-51545) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a new suite to test `AddMetadataColumns` rule. Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #50304 from mihailotim-db/mihailotim-db/unique_metadata_cols. Authored-by: Mihailo Timotic <mihailo.timotic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent f487515 commit aaa94c4

File tree

5 files changed

+225
-31
lines changed

5 files changed

+225
-31
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -985,25 +985,30 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
985985
object AddMetadataColumns extends Rule[LogicalPlan] {
986986
import org.apache.spark.sql.catalyst.util._
987987

988-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDownWithPruning(
989-
AlwaysProcess.fn, ruleId) {
990-
case hint: UnresolvedHint => hint
991-
// Add metadata output to all node types
992-
case node if node.children.nonEmpty && node.resolved && hasMetadataCol(node) =>
993-
val inputAttrs = AttributeSet(node.children.flatMap(_.output))
994-
val metaCols = getMetadataAttributes(node).filterNot(inputAttrs.contains)
995-
if (metaCols.isEmpty) {
996-
node
997-
} else {
998-
val newNode = node.mapChildren(addMetadataCol(_, metaCols.map(_.exprId).toSet))
999-
// We should not change the output schema of the plan. We should project away the extra
1000-
// metadata columns if necessary.
1001-
if (newNode.sameOutput(node)) {
1002-
newNode
988+
def apply(plan: LogicalPlan): LogicalPlan = {
989+
val onlyUniqueAndNecessaryMetadataColumns =
990+
conf.getConf(SQLConf.ONLY_NECESSARY_AND_UNIQUE_METADATA_COLUMNS)
991+
plan.resolveOperatorsDownWithPruning(AlwaysProcess.fn, ruleId) {
992+
case hint: UnresolvedHint => hint
993+
// Add metadata output to all node types
994+
case node if node.children.nonEmpty && node.resolved && hasMetadataCol(node) =>
995+
val inputAttrs = AttributeSet(node.children.flatMap(_.output))
996+
val metaCols = getMetadataAttributes(node).filterNot(inputAttrs.contains)
997+
if (metaCols.isEmpty) {
998+
node
1003999
} else {
1004-
Project(node.output, newNode)
1000+
val newNode = node.mapChildren(
1001+
addMetadataCol(_, metaCols.map(_.exprId).toSet, onlyUniqueAndNecessaryMetadataColumns)
1002+
)
1003+
// We should not change the output schema of the plan. We should project away the extra
1004+
// metadata columns if necessary.
1005+
if (newNode.sameOutput(node)) {
1006+
newNode
1007+
} else {
1008+
Project(node.output, newNode)
1009+
}
10051010
}
1006-
}
1011+
}
10071012
}
10081013

10091014
private def getMetadataAttributes(plan: LogicalPlan): Seq[Attribute] = {
@@ -1031,18 +1036,32 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
10311036

10321037
private def addMetadataCol(
10331038
plan: LogicalPlan,
1034-
requiredAttrIds: Set[ExprId]): LogicalPlan = plan match {
1039+
requiredAttrIds: Set[ExprId],
1040+
onlyUniqueAndNecessaryMetadataColumns: Boolean = true): LogicalPlan = plan match {
10351041
case s: ExposesMetadataColumns if s.metadataOutput.exists( a =>
10361042
requiredAttrIds.contains(a.exprId)) =>
10371043
s.withMetadataColumns()
10381044
case p: Project if p.metadataOutput.exists(a => requiredAttrIds.contains(a.exprId)) =>
1045+
val uniqueMetadataColumns = if (onlyUniqueAndNecessaryMetadataColumns) {
1046+
val actualRequiredExprIds = new util.HashSet[ExprId](requiredAttrIds.asJava)
1047+
p.projectList.foreach(ne => actualRequiredExprIds.remove(ne.exprId))
1048+
p.metadataOutput.filter(attr => actualRequiredExprIds.contains(attr.exprId))
1049+
} else {
1050+
p.metadataOutput
1051+
}
1052+
10391053
val newProj = p.copy(
10401054
// Do not leak the qualified-access-only restriction to normal plan outputs.
1041-
projectList = p.projectList ++ p.metadataOutput.map(_.markAsAllowAnyAccess()),
1042-
child = addMetadataCol(p.child, requiredAttrIds))
1055+
projectList = p.projectList ++ uniqueMetadataColumns.map(_.markAsAllowAnyAccess()),
1056+
child = addMetadataCol(p.child, requiredAttrIds, onlyUniqueAndNecessaryMetadataColumns)
1057+
)
10431058
newProj.copyTagsFrom(p)
10441059
newProj
1045-
case _ => plan.withNewChildren(plan.children.map(addMetadataCol(_, requiredAttrIds)))
1060+
case _ =>
1061+
plan.withNewChildren(
1062+
plan.children
1063+
.map(addMetadataCol(_, requiredAttrIds, onlyUniqueAndNecessaryMetadataColumns))
1064+
)
10461065
}
10471066
}
10481067

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,15 @@ object SQLConf {
241241
}
242242
}
243243

244+
val ONLY_NECESSARY_AND_UNIQUE_METADATA_COLUMNS =
245+
buildConf("spark.sql.analyzer.uniqueNecessaryMetadataColumns")
246+
.internal()
247+
.doc(
248+
"When this conf is enabled, AddMetadataColumns rule should only add necessary metadata " +
249+
"columns and only if those columns are not already present in the project list.")
250+
.booleanConf
251+
.createWithDefault(true)
252+
244253
val ANALYZER_MAX_ITERATIONS = buildConf("spark.sql.analyzer.maxIterations")
245254
.internal()
246255
.doc("The max number of iterations the analyzer runs.")

sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,10 +1025,10 @@ Project [a#x, a#x, z2#x]
10251025
+- PipeOperator
10261026
+- Filter (z2#x = 0)
10271027
+- PipeOperator
1028-
+- Project [a#x, z2#x, a#x, a#x]
1029-
+- Project [a#x, z1#x, (a#x - a#x) AS z2#x, a#x, a#x]
1030-
+- Project [a#x, (a#x + a#x) AS z1#x, a#x, a#x, a#x]
1031-
+- Project [a#x, a#x, a#x, a#x, a#x]
1028+
+- Project [a#x, z2#x, a#x]
1029+
+- Project [a#x, z1#x, (a#x - a#x) AS z2#x, a#x]
1030+
+- Project [a#x, (a#x + a#x) AS z1#x, a#x]
1031+
+- Project [a#x, a#x]
10321032
+- Join Inner, (a#x = a#x)
10331033
:- SubqueryAlias lhs
10341034
: +- LocalRelation [a#x]

sql/core/src/test/resources/sql-tests/analyzer-results/using-join.sql.out

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,7 @@ SELECT k FROM (SELECT nt2.k FROM nt1 full outer join nt2 using (k))
568568
Project [k#x]
569569
+- SubqueryAlias __auto_generated_subquery_name
570570
+- Project [k#x]
571-
+- Project [coalesce(k#x, k#x) AS k#x, v1#x, v2#x, k#x, k#x]
571+
+- Project [coalesce(k#x, k#x) AS k#x, v1#x, v2#x, k#x]
572572
+- Join FullOuter, (k#x = k#x)
573573
:- SubqueryAlias nt1
574574
: +- View (`nt1`, [k#x, v1#x])
@@ -589,7 +589,7 @@ SELECT nt2.k AS key FROM nt1 full outer join nt2 using (k) ORDER BY key
589589
-- !query analysis
590590
Sort [key#x ASC NULLS FIRST], true
591591
+- Project [k#x AS key#x]
592-
+- Project [coalesce(k#x, k#x) AS k#x, v1#x, v2#x, k#x, k#x]
592+
+- Project [coalesce(k#x, k#x) AS k#x, v1#x, v2#x, k#x]
593593
+- Join FullOuter, (k#x = k#x)
594594
:- SubqueryAlias nt1
595595
: +- View (`nt1`, [k#x, v1#x])
@@ -609,7 +609,7 @@ Sort [key#x ASC NULLS FIRST], true
609609
SELECT k, nt1.k FROM nt1 full outer join nt2 using (k)
610610
-- !query analysis
611611
Project [k#x, k#x]
612-
+- Project [coalesce(k#x, k#x) AS k#x, v1#x, v2#x, k#x, k#x]
612+
+- Project [coalesce(k#x, k#x) AS k#x, v1#x, v2#x, k#x]
613613
+- Join FullOuter, (k#x = k#x)
614614
:- SubqueryAlias nt1
615615
: +- View (`nt1`, [k#x, v1#x])
@@ -629,7 +629,7 @@ Project [k#x, k#x]
629629
SELECT k, nt2.k FROM nt1 full outer join nt2 using (k)
630630
-- !query analysis
631631
Project [k#x, k#x]
632-
+- Project [coalesce(k#x, k#x) AS k#x, v1#x, v2#x, k#x, k#x]
632+
+- Project [coalesce(k#x, k#x) AS k#x, v1#x, v2#x, k#x]
633633
+- Join FullOuter, (k#x = k#x)
634634
:- SubqueryAlias nt1
635635
: +- View (`nt1`, [k#x, v1#x])
@@ -828,9 +828,9 @@ WithCTE
828828
: +- SubqueryAlias t
829829
: +- LocalRelation [key#x]
830830
+- Project [key#x]
831-
+- Project [key#x, key#x, key#x]
831+
+- Project [key#x, key#x]
832832
+- Filter NOT key#x LIKE bb.%
833-
+- Project [coalesce(key#x, key#x) AS key#x, key#x, key#x, key#x]
833+
+- Project [coalesce(key#x, key#x) AS key#x, key#x]
834834
+- Join FullOuter, (key#x = key#x)
835835
:- SubqueryAlias t1
836836
: +- CTERelationRef xxxx, true, [key#x], false, false, 1
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.QueryTest
21+
import org.apache.spark.sql.catalyst.expressions.NamedExpression
22+
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project}
23+
import org.apache.spark.sql.internal.SQLConf
24+
import org.apache.spark.sql.test.SharedSparkSession
25+
26+
class AddMetadataColumnsSuite extends QueryTest with SharedSparkSession {
27+
28+
test("Add only necessary metadata columns") {
29+
// For a query like:
30+
//
31+
// {{{
32+
// SELECT t1.k
33+
// FROM VALUES(1,2) AS t1(k,v1) FULL OUTER JOIN VALUES(1,2) AS t2(k,v2) USING (k)
34+
// }}}
35+
//
36+
// the analyzed plan would look like:
37+
// Project [k#0]
38+
// +- Project [coalesce(k#0, k#2) AS k#4, v1#1, v2#3, k#0, k#2]
39+
// +- Join FullOuter, (k#0 = k#2)
40+
// :- SubqueryAlias nt1
41+
// : +- LocalRelation [k#0, v1#1]
42+
// +- SubqueryAlias nt2
43+
// +- LocalRelation [k#2, v2#3]
44+
// The inner project in this case contains a reference to k#2, which is not needed in the
45+
// top-most project. With `spark.sql.analyzer.uniqueNecessaryMetadataColumns` set to false, we
46+
// will add k#2 to the project list because it is a metadata column. Otherwise, we don't need
47+
// it and can avoid adding it in AddMetadataColumns rule.
48+
withTable("t1", "t2") {
49+
sql("CREATE TABLE t1(k INT, v1 INT)")
50+
sql("CREATE TABLE t2(k INT, v2 INT)")
51+
val left = sql("select * from t1")
52+
val right = sql("select * from t2")
53+
val join = left.join(right, Seq("k"), "full_outer")
54+
55+
val rightKeyExprId = right
56+
.select(right("k"))
57+
.queryExecution
58+
.analyzed
59+
.asInstanceOf[Project]
60+
.projectList
61+
.head
62+
.exprId
63+
64+
withSQLConf(SQLConf.ONLY_NECESSARY_AND_UNIQUE_METADATA_COLUMNS.key -> "true") {
65+
// Inner project list shouldn't contain a reference to the right key.
66+
val analyzed = join.select(left("k")).queryExecution.analyzed
67+
analyzed match {
68+
case Project(_, Project(innerProjectList: Seq[NamedExpression], _)) =>
69+
assert(Seq("k", "v1", "v2", "k") == innerProjectList.map(_.name))
70+
assert(!innerProjectList.map(_.exprId).contains(rightKeyExprId))
71+
}
72+
}
73+
74+
withSQLConf(SQLConf.ONLY_NECESSARY_AND_UNIQUE_METADATA_COLUMNS.key -> "false") {
75+
// Inner project list should contain a reference to the right key.
76+
val analyzed = join.select(left("k")).queryExecution.analyzed
77+
analyzed match {
78+
case Project(_, Project(innerProjectList: Seq[NamedExpression], _)) =>
79+
assert(Seq("k", "v1", "v2", "k", "k") == innerProjectList.map(_.name))
80+
assert(innerProjectList.map(_.exprId).contains(rightKeyExprId))
81+
}
82+
}
83+
}
84+
}
85+
86+
test("Add only unique metadata columns") {
87+
// For a query like:
88+
// {{{
89+
// SELECT t1.k
90+
// FROM VALUES(1,2) AS t1(k, v1) FULL OUTER JOIN VALUES(1,2) AS t2(k,v2) USING (k)
91+
// WHERE t1.k IS NOT NULL
92+
// }}}
93+
//
94+
// the analyzed plan will look like:
95+
// Project [k#0]
96+
// +- Project [k#4, v1#1, v2#3, k#0, k#2]
97+
// +- Filter isnotnull(k#0)
98+
// +- Project [coalesce(k#0, k#2) AS k#4, v1#1, v2#3, k#0, k#0, k#2]
99+
// +- Join FullOuter, (k#0 = k#2)
100+
// :- SubqueryAlias t1
101+
// : +- LocalRelation [k#0, v1#1]
102+
// +- SubqueryAlias t2
103+
// +- LocalRelation [k#2, v2#3]
104+
//
105+
// In this case, the Project under Filter contains a duplicate #k#0 attribute reference as well
106+
// as an unnecessary k#2 attribute reference. Additionally, the second top-most Project has an
107+
// extra k#2 that can also be removed. Duplicate reference comes from the fact that this
108+
// attribute will first be added by ResolveReferences rule as missing input, but
109+
// AddMetadataColumns doesn't respect the fact that this attribute already exists in the
110+
// project list and duplicates it. With `spark.sql.analyzer.uniqueNecessaryMetadataColumns` set
111+
// to true, we remove this duplication and the unnecessary attribute.
112+
withTable("t1", "t2") {
113+
sql("CREATE TABLE t1(k INT, v1 INT)")
114+
sql("CREATE TABLE t2(k INT, v2 INT)")
115+
val left = sql("select * from t1")
116+
val right = sql("select * from t2")
117+
val join = left.join(right, Seq("k"), "full_outer")
118+
val filter = join.filter(left("k").isNull)
119+
120+
val leftKeyExprId = left
121+
.select(left("k"))
122+
.queryExecution
123+
.analyzed
124+
.asInstanceOf[Project]
125+
.projectList
126+
.head
127+
.exprId
128+
val rightKeyExprId = right
129+
.select(right("k"))
130+
.queryExecution
131+
.analyzed
132+
.asInstanceOf[Project]
133+
.projectList
134+
.head
135+
.exprId
136+
137+
withSQLConf(SQLConf.ONLY_NECESSARY_AND_UNIQUE_METADATA_COLUMNS.key -> "true") {
138+
// With conf on, no duplication of left key and no unnecessary right key.
139+
val analyzed = filter.select(left("k")).queryExecution.analyzed
140+
analyzed match {
141+
case Project(
142+
_,
143+
Project(_, Filter(_, Project(innerProjectList: Seq[NamedExpression], _)))
144+
) =>
145+
assert(Seq("k", "v1", "v2", "k") == innerProjectList.map(_.name))
146+
assert(innerProjectList.map(_.exprId).count(_ == rightKeyExprId) == 0)
147+
assert(innerProjectList.map(_.exprId).count(_ == leftKeyExprId) == 1)
148+
}
149+
}
150+
151+
withSQLConf(SQLConf.ONLY_NECESSARY_AND_UNIQUE_METADATA_COLUMNS.key -> "false") {
152+
// With conf off, duplication of left key and an unnecessary right key.
153+
val analyzed = filter.select(left("k")).queryExecution.analyzed
154+
analyzed match {
155+
case Project(
156+
_,
157+
Project(_, Filter(_, Project(innerProjectList: Seq[NamedExpression], _)))
158+
) =>
159+
assert(Seq("k", "v1", "v2", "k", "k", "k") == innerProjectList.map(_.name))
160+
assert(innerProjectList.map(_.exprId).count(_ == rightKeyExprId) == 1)
161+
assert(innerProjectList.map(_.exprId).count(_ == leftKeyExprId) == 2)
162+
}
163+
}
164+
}
165+
}
166+
}

0 commit comments

Comments
 (0)