Skip to content

[SPARK-7232] [SQL] Add a Substitution batch for spark sql analyzer #5776

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ class Analyzer(
val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil

lazy val batches: Seq[Batch] = Seq(
Batch("Substitution", fixedPoint,
CTESubstitution ::
WindowsSubstitution ::
Nil : _*),
Batch("Resolution", fixedPoint,
ResolveRelations ::
ResolveReferences ::
Expand All @@ -71,6 +75,55 @@ class Analyzer(
extendedResolutionRules : _*)
)

/**
* Substitute child plan with cte definitions
*/
object CTESubstitution extends Rule[LogicalPlan] {
// TODO allow subquery to define CTE
def apply(plan: LogicalPlan): LogicalPlan = plan match {
case With(child, relations) => substituteCTE(child, relations)
case other => other
}

def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
plan transform {
// In hive, if there is same table name in database and CTE definition,
// hive will use the table in database, not the CTE one.
// Taking into account the reasonableness and the implementation complexity,
// here use the CTE definition first, check table name only and ignore database name
// see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info
case u : UnresolvedRelation =>
val substituted = cteRelations.get(u.tableIdentifier.last).map { relation =>
val withAlias = u.alias.map(Subquery(_, relation))
withAlias.getOrElse(relation)
}
substituted.getOrElse(u)
}
}
}

/**
* Substitute child plan with WindowSpecDefinitions.
*/
object WindowsSubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
case WithWindowDefinition(windowDefinitions, child) =>
child.transform {
case plan => plan.transformExpressions {
case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
val errorMessage =
s"Window specification $windowName is not defined in the WINDOW clause."
val windowSpecDefinition =
windowDefinitions
.get(windowName)
.getOrElse(failAnalysis(errorMessage))
WindowExpression(c, windowSpecDefinition)
}
}
}
}

/**
* Removes no-op Alias expressions from the plan.
*/
Expand Down Expand Up @@ -172,36 +225,20 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
def getTable(u: UnresolvedRelation): LogicalPlan = {
try {
// In hive, if there is same table name in database and CTE definition,
// hive will use the table in database, not the CTE one.
// Taking into account the reasonableness and the implementation complexity,
// here use the CTE definition first, check table name only and ignore database name
cteRelations.get(u.tableIdentifier.last)
.map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
.getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"no such table ${u.tableName}")
}
}

def apply(plan: LogicalPlan): LogicalPlan = {
val (realPlan, cteRelations) = plan match {
// TODO allow subquery to define CTE
// Add cte table to a temp relation map,drop `with` plan and keep its child
case With(child, relations) => (child, relations)
case other => (other, Map.empty[String, LogicalPlan])
}

realPlan transform {
case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
i.copy(
table = EliminateSubQueries(getTable(u, cteRelations)))
case u: UnresolvedRelation =>
getTable(u, cteRelations)
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
i.copy(table = EliminateSubQueries(getTable(u)))
case u: UnresolvedRelation =>
getTable(u)
}
}

Expand Down Expand Up @@ -664,21 +701,6 @@ class Analyzer(
// We have to use transformDown at here to make sure the rule of
// "Aggregate with Having clause" will be triggered.
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
case WithWindowDefinition(windowDefinitions, child) =>
child.transform {
case plan => plan.transformExpressions {
case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
val errorMessage =
s"Window specification $windowName is not defined in the WINDOW clause."
val windowSpecDefinition =
windowDefinitions
.get(windowName)
.getOrElse(failAnalysis(errorMessage))
WindowExpression(c, windowSpecDefinition)
}
}

// Aggregate with Having clause. This rule works with an unresolved Aggregate because
// a resolved Aggregate will not have Window Functions.
case f @ Filter(condition, a @ Aggregate(groupingExprs, aggregateExprs, child))
Expand Down