Skip to content

Commit

Permalink
[SPARK-32237][SQL][3.0] Resolve hint in CTE
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
The backport of #29062

This PR is to move `Substitution` rule before `Hints` rule in `Analyzer` to avoid hint in CTE not working.

### Why are the changes needed?
Below SQL in Spark3.0 will throw AnalysisException, but it works in Spark2.x
```sql
WITH cte AS (SELECT /*+ REPARTITION(3) */ T.id, T.data FROM $t1 T)
SELECT cte.id, cte.data FROM cte
```
```
Failed to analyze query: org.apache.spark.sql.AnalysisException: cannot resolve '`cte.id`' given input columns: [cte.data, cte.id]; line 3 pos 7;
'Project ['cte.id, 'cte.data]
+- SubqueryAlias cte
   +- Project [id#21L, data#22]
      +- SubqueryAlias T
         +- SubqueryAlias testcat.ns1.ns2.tbl
            +- RelationV2[id#21L, data#22] testcat.ns1.ns2.tbl

'Project ['cte.id, 'cte.data]
+- SubqueryAlias cte
   +- Project [id#21L, data#22]
      +- SubqueryAlias T
         +- SubqueryAlias testcat.ns1.ns2.tbl
            +- RelationV2[id#21L, data#22] testcat.ns1.ns2.tbl
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add a unit test

Closes #29201 from LantaoJin/SPARK-32237_branch-3.0.

Lead-authored-by: LantaoJin <jinlantao@gmail.com>
Co-authored-by: Alan Jin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
LantaoJin authored and cloud-fan committed Jul 24, 2020
1 parent ebac47b commit be1b282
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,16 @@ class Analyzer(
val postHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil

lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
new ResolveHints.ResolveJoinStrategyHints(conf),
new ResolveHints.ResolveCoalesceHints(conf)),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
new SubstituteUnresolvedOrdinals(conf)),
Batch("Hints", fixedPoint,
new ResolveHints.ResolveJoinStrategyHints(conf),
new ResolveHints.ResolveCoalesceHints(conf)),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
ResolveNamespace(catalogManager) ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.log4j.Level
import org.scalatest.Matchers

import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
Expand Down Expand Up @@ -879,4 +879,27 @@ class AnalysisSuite extends AnalysisTest with Matchers {
Seq("Intersect can only be performed on tables with the compatible column types. " +
"timestamp <> double at the second column of the second table"))
}

test("SPARK-32237: Hint in CTE") {
val plan = With(
Project(
Seq(UnresolvedAttribute("cte.a")),
UnresolvedRelation(TableIdentifier("cte"))
),
Seq(
(
"cte",
SubqueryAlias(
AliasIdentifier("cte"),
UnresolvedHint(
"REPARTITION",
Seq(Literal(3)),
Project(testRelation.output, testRelation)
)
)
)
)
)
assertAnalysisSuccess(plan)
}
}
12 changes: 12 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3468,6 +3468,18 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}

test("SPARK-32237: Hint in CTE") {
withTable("t") {
sql("CREATE TABLE t USING PARQUET AS SELECT 1 AS id")
checkAnswer(
sql("""
|WITH cte AS (SELECT /*+ REPARTITION(3) */ * FROM t)
|SELECT * FROM cte
""".stripMargin),
Row(1) :: Nil)
}
}

test("SPARK-32372: ResolveReferences.dedupRight should only rewrite attributes for ancestor " +
"plans of the conflict plan") {
sql("SELECT name, avg(age) as avg_age FROM person GROUP BY name")
Expand Down

0 comments on commit be1b282

Please sign in to comment.