@@ -123,127 +123,6 @@ object AnalysisContext {
123
123
}
124
124
}
125
125
126
- object Analyzer {
127
-
128
- /**
129
- * Rewrites a given `plan` recursively based on rewrite mappings from old plans to new ones.
130
- * This method also updates all the related references in the `plan` accordingly.
131
- *
132
- * @param plan to rewrite
133
- * @param rewritePlanMap has mappings from old plans to new ones for the given `plan`.
134
- * @return a rewritten plan and updated references related to a root node of
135
- * the given `plan` for rewriting it.
136
- */
137
- def rewritePlan (plan : LogicalPlan , rewritePlanMap : Map [LogicalPlan , LogicalPlan ])
138
- : (LogicalPlan , Seq [(Attribute , Attribute )]) = {
139
- if (plan.resolved) {
140
- val attrMapping = new mutable.ArrayBuffer [(Attribute , Attribute )]()
141
- val newChildren = plan.children.map { child =>
142
- // If not, we'd rewrite child plan recursively until we find the
143
- // conflict node or reach the leaf node.
144
- val (newChild, childAttrMapping) = rewritePlan(child, rewritePlanMap)
145
- attrMapping ++= childAttrMapping.filter { case (oldAttr, _) =>
146
- // `attrMapping` is not only used to replace the attributes of the current `plan`,
147
- // but also to be propagated to the parent plans of the current `plan`. Therefore,
148
- // the `oldAttr` must be part of either `plan.references` (so that it can be used to
149
- // replace attributes of the current `plan`) or `plan.outputSet` (so that it can be
150
- // used by those parent plans).
151
- (plan.outputSet ++ plan.references).contains(oldAttr)
152
- }
153
- newChild
154
- }
155
-
156
- val newPlan = if (rewritePlanMap.contains(plan)) {
157
- rewritePlanMap(plan).withNewChildren(newChildren)
158
- } else {
159
- plan.withNewChildren(newChildren)
160
- }
161
-
162
- assert(! attrMapping.groupBy(_._1.exprId)
163
- .exists(_._2.map(_._2.exprId).distinct.length > 1 ),
164
- " Found duplicate rewrite attributes" )
165
-
166
- val attributeRewrites = AttributeMap (attrMapping)
167
- // Using attrMapping from the children plans to rewrite their parent node.
168
- // Note that we shouldn't rewrite a node using attrMapping from its sibling nodes.
169
- val p = newPlan.transformExpressions {
170
- case a : Attribute =>
171
- updateAttr(a, attributeRewrites)
172
- case s : SubqueryExpression =>
173
- s.withNewPlan(updateOuterReferencesInSubquery(s.plan, attributeRewrites))
174
- }
175
- attrMapping ++= plan.output.zip(p.output)
176
- .filter { case (a1, a2) => a1.exprId != a2.exprId }
177
- p -> attrMapping
178
- } else {
179
- // Just passes through unresolved nodes
180
- plan.mapChildren {
181
- rewritePlan(_, rewritePlanMap)._1
182
- } -> Nil
183
- }
184
- }
185
-
186
- private def updateAttr (attr : Attribute , attrMap : AttributeMap [Attribute ]): Attribute = {
187
- val exprId = attrMap.getOrElse(attr, attr).exprId
188
- attr.withExprId(exprId)
189
- }
190
-
191
- /**
192
- * The outer plan may have old references and the function below updates the
193
- * outer references to refer to the new attributes.
194
- *
195
- * For example (SQL):
196
- * {{{
197
- * SELECT * FROM t1
198
- * INTERSECT
199
- * SELECT * FROM t1
200
- * WHERE EXISTS (SELECT 1
201
- * FROM t2
202
- * WHERE t1.c1 = t2.c1)
203
- * }}}
204
- * Plan before resolveReference rule.
205
- * 'Intersect
206
- * :- Project [c1#245, c2#246]
207
- * : +- SubqueryAlias t1
208
- * : +- Relation[c1#245,c2#246] parquet
209
- * +- 'Project [*]
210
- * +- Filter exists#257 [c1#245]
211
- * : +- Project [1 AS 1#258]
212
- * : +- Filter (outer(c1#245) = c1#251)
213
- * : +- SubqueryAlias t2
214
- * : +- Relation[c1#251,c2#252] parquet
215
- * +- SubqueryAlias t1
216
- * +- Relation[c1#245,c2#246] parquet
217
- * Plan after the resolveReference rule.
218
- * Intersect
219
- * :- Project [c1#245, c2#246]
220
- * : +- SubqueryAlias t1
221
- * : +- Relation[c1#245,c2#246] parquet
222
- * +- Project [c1#259, c2#260]
223
- * +- Filter exists#257 [c1#259]
224
- * : +- Project [1 AS 1#258]
225
- * : +- Filter (outer(c1#259) = c1#251) => Updated
226
- * : +- SubqueryAlias t2
227
- * : +- Relation[c1#251,c2#252] parquet
228
- * +- SubqueryAlias t1
229
- * +- Relation[c1#259,c2#260] parquet => Outer plan's attributes are rewritten.
230
- */
231
- private def updateOuterReferencesInSubquery (
232
- plan : LogicalPlan ,
233
- attrMap : AttributeMap [Attribute ]): LogicalPlan = {
234
- AnalysisHelper .allowInvokingTransformsInAnalyzer {
235
- plan transformDown { case currentFragment =>
236
- currentFragment transformExpressions {
237
- case OuterReference (a : Attribute ) =>
238
- OuterReference (updateAttr(a, attrMap))
239
- case s : SubqueryExpression =>
240
- s.withNewPlan(updateOuterReferencesInSubquery(s.plan, attrMap))
241
- }
242
- }
243
- }
244
- }
245
- }
246
-
247
126
/**
248
127
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute ]]s and
249
128
* [[UnresolvedRelation ]]s into fully typed objects using information in a [[SessionCatalog ]].
@@ -1376,7 +1255,7 @@ class Analyzer(
1376
1255
if (conflictPlans.isEmpty) {
1377
1256
right
1378
1257
} else {
1379
- Analyzer .rewritePlan(right, conflictPlans.toMap)._1
1258
+ right.rewriteWithPlanMapping( conflictPlans.toMap)
1380
1259
}
1381
1260
}
1382
1261
0 commit comments