Skip to content

Commit 7dfb8fe

Browse files
dusantism-dbcloud-fan
authored andcommitted
Revert #48786 - "Revert "[SPARK-48273][SQL] Fix late rewrite of PlanWithUnresolvedIdentifier""
### What changes were proposed in this pull request? This PR reverts #48786 ### Why are the changes needed? Custom rules in the early analyzer batches (custom hint resolution rules injection) can match `UnresolvedRelation`, which is only visible after the IDENTIFIER clause is expanded. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests + added a unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50311 from dusantism-db/identifier-revert. Authored-by: Dušan Tišma <dusan.tisma@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent aaa94c4 commit 7dfb8fe

File tree

4 files changed

+46
-7
lines changed

4 files changed

+46
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
336336
TypeCoercion.typeCoercionRules
337337
}
338338

339-
override def batches: Seq[Batch] = Seq(
339+
private def earlyBatches: Seq[Batch] = Seq(
340340
Batch("Substitution", fixedPoint,
341341
// This rule optimizes `UpdateFields` expression chains so looks more like optimization rule.
342342
// However, when manipulating deeply nested schema, `UpdateFields` expression tree could be
@@ -357,7 +357,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
357357
Batch("Simple Sanity Check", Once,
358358
LookupFunctions),
359359
Batch("Keep Legacy Outputs", Once,
360-
KeepLegacyOutputs),
360+
KeepLegacyOutputs)
361+
)
362+
363+
override def batches: Seq[Batch] = earlyBatches ++ Seq(
361364
Batch("Resolution", fixedPoint,
362365
new ResolveCatalogs(catalogManager) ::
363366
ResolveInsertInto ::
@@ -409,7 +412,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
409412
ResolveTimeZone ::
410413
ResolveRandomSeed ::
411414
ResolveBinaryArithmetic ::
412-
ResolveIdentifierClause ::
415+
new ResolveIdentifierClause(earlyBatches) ::
413416
ResolveUnion ::
414417
ResolveRowLevelCommandAssignments ::
415418
MoveParameterizedQueriesDown ::

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,24 @@ package org.apache.spark.sql.catalyst.analysis
2020
import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper, Expression}
2121
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
2222
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23-
import org.apache.spark.sql.catalyst.rules.Rule
23+
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
2424
import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_IDENTIFIER
2525
import org.apache.spark.sql.types.StringType
2626

2727
/**
2828
* Resolves the identifier expressions and builds the original plans/expressions.
2929
*/
30-
object ResolveIdentifierClause extends Rule[LogicalPlan] with AliasHelper with EvalHelper {
30+
class ResolveIdentifierClause(earlyBatches: Seq[RuleExecutor[LogicalPlan]#Batch])
31+
extends Rule[LogicalPlan] with AliasHelper with EvalHelper {
32+
33+
private val executor = new RuleExecutor[LogicalPlan] {
34+
override def batches: Seq[Batch] = earlyBatches.asInstanceOf[Seq[Batch]]
35+
}
3136

3237
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
3338
_.containsPattern(UNRESOLVED_IDENTIFIER)) {
3439
case p: PlanWithUnresolvedIdentifier if p.identifierExpr.resolved && p.childrenResolved =>
35-
p.planBuilder.apply(evalIdentifierExpr(p.identifierExpr), p.children)
40+
executor.execute(p.planBuilder.apply(evalIdentifierExpr(p.identifierExpr), p.children))
3641
case other =>
3742
other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER)) {
3843
case e: ExpressionWithUnresolvedIdentifier if e.identifierExpr.resolved =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
142142
override val maxIterationsSetting: String = null) extends Strategy
143143

144144
/** A batch of rules. */
145-
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
145+
protected[catalyst] case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
146146

147147
/** Defines a sequence of rule batches, to be overridden by the implementation. */
148148
protected def batches: Seq[Batch]

sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.broadcast.Broadcast
2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.rdd.RDD
2828
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
29+
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2930
import org.apache.spark.sql.catalyst.catalog.BucketSpec
3031
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3132
import org.apache.spark.sql.catalyst.expressions._
@@ -76,6 +77,14 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper with Adapt
7677
}
7778
}
7879

80+
private def withTable(spark: SparkSession, tableNames: String*)(f: => Unit): Unit = {
81+
try f finally {
82+
tableNames.foreach { name =>
83+
spark.sql(s"DROP TABLE IF EXISTS $name")
84+
}
85+
}
86+
}
87+
7988
test("inject analyzer rule") {
8089
withSession(Seq(_.injectResolutionRule(MyRule))) { session =>
8190
assert(session.sessionState.analyzer.extendedResolutionRules.contains(MyRule(session)))
@@ -571,6 +580,28 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper with Adapt
571580
assert(res.collect {case s: Sort => s}.isEmpty)
572581
}
573582
}
583+
584+
test("early batch rule is applied on resolved IDENTIFIER") {
585+
var ruleApplied = false
586+
587+
case class UnresolvedRelationRule(spark: SparkSession) extends Rule[LogicalPlan] {
588+
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
589+
case r: UnresolvedRelation =>
590+
ruleApplied = true
591+
r
592+
}
593+
}
594+
595+
withSession(Seq(_.injectHintResolutionRule(UnresolvedRelationRule))) { session =>
596+
withTable(session, "my_table") {
597+
session.sql("CREATE TABLE IF NOT EXISTS my_table (col1 INT)")
598+
ruleApplied = false
599+
600+
session.sql("SELECT * FROM IDENTIFIER('my_' || 'table')").collect()
601+
assert(ruleApplied)
602+
}
603+
}
604+
}
574605
}
575606

576607
case class MyRule(spark: SparkSession) extends Rule[LogicalPlan] {

0 commit comments

Comments
 (0)