Skip to content

Commit 62c0669

Browse files
mihailotim-dbcloud-fan
authored andcommitted
[SPARK-47241][SQL][FOLLOWUP] Fix issue when laterally referencing a Generator
### What changes were proposed in this pull request? Fix issue when laterally referencing a `Generator`. ### Why are the changes needed? Fix the following query pattern: ``` WITH cte AS (SELECT EXPLODE(ARRAY(1, 2, 3)) AS c1, c1) SELECT * FROM cte ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a test case to `LateralColumnAliasSuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#50310 from mihailotim-db/mihailotim-db/generator_lca. Authored-by: Mihailo Timotic <mihailo.timotic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 3ffb800 commit 62c0669

File tree

3 files changed

+31
-54
lines changed

3 files changed

+31
-54
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2983,20 +2983,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
29832983
}
29842984
}
29852985

2986-
// We must wait until all expressions except for generator functions are resolved before
2987-
// rewriting generator functions in Project/Aggregate. This is necessary to make this rule
2988-
// stable for different execution orders of analyzer rules. See also SPARK-47241.
2989-
private def canRewriteGenerator(namedExprs: Seq[NamedExpression]): Boolean = {
2990-
namedExprs.forall { ne =>
2991-
ne.resolved || {
2992-
trimNonTopLevelAliases(ne) match {
2993-
case AliasedGenerator(_, _, _) => true
2994-
case _ => false
2995-
}
2996-
}
2997-
}
2998-
}
2999-
30002986
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
30012987
_.containsPattern(GENERATOR), ruleId) {
30022988
case p @ Project(Seq(UnresolvedStarWithColumns(_, _, _)), _) =>
@@ -3015,8 +3001,11 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
30153001
val generators = aggList.filter(hasGenerator).map(trimAlias)
30163002
throw QueryCompilationErrors.moreThanOneGeneratorError(generators)
30173003

3018-
case Aggregate(groupList, aggList, child, _) if canRewriteGenerator(aggList) &&
3019-
aggList.exists(hasGenerator) =>
3004+
case Aggregate(groupList, aggList, child, _) if
3005+
aggList.forall {
3006+
case AliasedGenerator(_, _, _) => true
3007+
case other => other.resolved
3008+
} && aggList.exists(hasGenerator) =>
30203009
// If generator in the aggregate list was visited, set the boolean flag true.
30213010
var generatorVisited = false
30223011

@@ -3061,8 +3050,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
30613050
// first for replacing `Project` with `Aggregate`.
30623051
p
30633052

3064-
case p @ Project(projectList, child) if canRewriteGenerator(projectList) &&
3065-
projectList.exists(hasGenerator) =>
3053+
// The star will be expanded differently if we insert `Generate` under `Project` too early.
3054+
case p @ Project(projectList, child) if !projectList.exists(_.exists(_.isInstanceOf[Star])) =>
30663055
val (resolvedGenerator, newProjectList) = projectList
30673056
.map(trimNonTopLevelAliases)
30683057
.foldLeft((None: Option[Generate], Nil: Seq[NamedExpression])) { (res, e) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,14 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
406406
// Lateral column alias does not have qualifiers. We always use the first name part to
407407
// look up lateral column aliases.
408408
val lowerCasedName = u.nameParts.head.toLowerCase(Locale.ROOT)
409-
aliasMap.get(lowerCasedName).map {
409+
aliasMap.get(lowerCasedName).filter {
410+
// Do not resolve LCA with aliased `Generator`, as it will be rewritten by the rule
411+
// `ExtractGenerator` with fresh output attribute IDs. The `Generator` will be pulled
412+
// out and put in a `Generate` node below `Project`, so that we can resolve the column
413+
// normally without LCA resolution.
414+
case scala.util.Left(alias) => !alias.child.isInstanceOf[Generator]
415+
case _ => true
416+
}.map {
410417
case scala.util.Left(alias) =>
411418
if (alias.resolved) {
412419
val resolvedAttr = resolveExpressionByPlanOutput(

sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala

Lines changed: 16 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,40 +1366,21 @@ class LateralColumnAliasSuite extends LateralColumnAliasSuiteBase {
13661366
sql("select 1 as a, a").queryExecution.assertAnalyzed()
13671367
}
13681368

1369-
test("SPARK-49349: Improve error message for LCA with Generate") {
1370-
checkError(
1371-
exception = intercept[AnalysisException] {
1372-
sql(
1373-
s"""
1374-
|SELECT
1375-
| explode(split(name , ',')) AS new_name,
1376-
| new_name like 'a%'
1377-
|FROM $testTable
1378-
|""".stripMargin)
1379-
},
1380-
condition = "UNSUPPORTED_FEATURE.LATERAL_COLUMN_ALIAS_IN_GENERATOR",
1381-
sqlState = "0A000",
1382-
parameters = Map(
1383-
"lca" -> "`new_name`",
1384-
"generatorExpr" -> "\"unresolvedalias(lateralAliasReference(new_name) LIKE a%)\""))
1385-
1386-
checkError(
1387-
exception = intercept[AnalysisException] {
1388-
sql(
1389-
s"""
1390-
|SELECT
1391-
| explode_outer(from_json(name,'array<struct<values:string>>')) as newName,
1392-
| size(from_json(newName.values,'array<string>')) +
1393-
| size(array(from_json(newName.values,'map<string,string>'))) as size
1394-
|FROM $testTable
1395-
|""".stripMargin)
1396-
},
1397-
condition = "UNSUPPORTED_FEATURE.LATERAL_COLUMN_ALIAS_IN_GENERATOR",
1398-
sqlState = "0A000",
1399-
parameters = Map(
1400-
"lca" -> "`newName.values`",
1401-
"generatorExpr" -> ("\"(size(from_json(lateralAliasReference(newName.values), " +
1402-
"array<string>)) + size(array(from_json(lateralAliasReference(newName.values), " +
1403-
"map<string,string>)))) AS size\"")))
1369+
test("LateralColumnAlias with Generate") {
1370+
checkAnswer(
1371+
sql("WITH cte AS (SELECT EXPLODE(ARRAY(1, 2, 3)) AS c1, c1) SELECT * FROM cte"),
1372+
Row(1, 1) :: Row(2, 2) :: Row(3, 3) :: Nil
1373+
)
1374+
checkAnswer(
1375+
sql(
1376+
s"""
1377+
|SELECT
1378+
| explode(split(name , ',')) AS new_name,
1379+
| new_name like 'a%'
1380+
|FROM $testTable
1381+
|""".stripMargin),
1382+
Row("alex", true) :: Row("amy", true) :: Row("cathy", false) ::
1383+
Row("david", false) :: Row("jen", false) :: Nil
1384+
)
14041385
}
14051386
}

0 commit comments

Comments
 (0)