Skip to content

Commit d0974cf

Browse files
author
Davies Liu
committed
improve explain on subquery
1 parent a4bae33 commit d0974cf

File tree

9 files changed

+176
-39
lines changed

9 files changed

+176
-39
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ class Analyzer(
122122
}
123123
substituted.getOrElse(u)
124124
case other =>
125+
// This can't be done in ResolveSubquery because that does not know the CTE.
125126
other transformExpressions {
126127
case e: SubqueryExpression =>
127128
e.withNewPlan(substituteCTE(e.query, cteRelations))
@@ -701,8 +702,10 @@ class Analyzer(
701702
}
702703

703704
/**
704-
* This rule resolve subqueries inside expressions.
705-
*/
705+
* This rule resolve subqueries inside expressions.
706+
*
707+
* Note: CTE are handled in CTESubstitution.
708+
*/
706709
object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper {
707710

708711
private def hasSubquery(e: Expression): Boolean = {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,45 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions
1919

20-
import org.apache.spark.sql.catalyst.InternalRow
2120
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
22-
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
23-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
21+
import org.apache.spark.sql.catalyst.plans.QueryPlan
22+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
2423
import org.apache.spark.sql.types.DataType
2524

2625
/**
27-
* A interface for subquery that is used in expressions.
28-
*/
29-
trait SubqueryExpression extends LeafExpression {
26+
* An interface for subquery that is used in expressions.
27+
*/
28+
abstract class SubqueryExpression extends LeafExpression{
29+
30+
/**
31+
* The logical plan of the query.
32+
*/
3033
def query: LogicalPlan
34+
35+
/**
36+
* The underline plan for the query, could be logical plan or physical plan.
37+
*
38+
* This is used to generate tree string.
39+
*/
40+
def plan: QueryPlan[_]
41+
42+
/**
43+
* Updates the query with new logical plan.
44+
*/
3145
def withNewPlan(plan: LogicalPlan): SubqueryExpression
3246
}
3347

3448
/**
35-
* A subquery that will return only one row and one column.
36-
*/
37-
case class ScalarSubquery(query: LogicalPlan) extends SubqueryExpression with CodegenFallback {
49+
* A subquery that will return only one row and one column.
50+
*
51+
* This is not evaluable, it should be converted into SparkScalaSubquery.
52+
*/
53+
case class ScalarSubquery(
54+
query: LogicalPlan,
55+
exprId: ExprId = NamedExpression.newExprId)
56+
extends SubqueryExpression with Unevaluable {
57+
58+
override def plan: LogicalPlan = Subquery(toString, query)
3859

3960
override lazy val resolved: Boolean = query.resolved
4061

@@ -49,20 +70,12 @@ case class ScalarSubquery(query: LogicalPlan) extends SubqueryExpression with Co
4970
}
5071
}
5172

52-
// It can not be evaluated by optimizer.
5373
override def foldable: Boolean = false
5474
override def nullable: Boolean = true
5575

56-
override def withNewPlan(plan: LogicalPlan): ScalarSubquery = ScalarSubquery(plan)
57-
58-
// TODO: support sql()
59-
60-
// the first column in first row from `query`.
61-
private var result: Any = null
76+
override def withNewPlan(plan: LogicalPlan): ScalarSubquery = ScalarSubquery(plan, exprId)
6277

63-
def updateResult(v: Any): Unit = {
64-
result = v
65-
}
78+
override def toString: String = s"subquery#${exprId.id}"
6679

67-
override def eval(input: InternalRow): Any = result
80+
// TODO: support sql()
6881
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,19 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
8888
Batch("Decimal Optimizations", FixedPoint(100),
8989
DecimalAggregates) ::
9090
Batch("LocalRelation", FixedPoint(100),
91-
ConvertToLocalRelation) :: Nil
91+
ConvertToLocalRelation) ::
92+
Batch("Subquery", Once,
93+
Subquery) :: Nil
94+
}
95+
96+
/**
97+
* Optimize all the subqueries inside expression.
98+
*/
99+
object Subquery extends Rule[LogicalPlan] {
100+
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
101+
case subquery: SubqueryExpression =>
102+
subquery.withNewPlan(execute(subquery.query))
103+
}
92104
}
93105
}
94106

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.catalyst.plans
1919

2020
import org.apache.spark.sql.catalyst.expressions._
21+
import org.apache.spark.sql.catalyst.plans.logical.Subquery
2122
import org.apache.spark.sql.catalyst.trees.TreeNode
2223
import org.apache.spark.sql.types.{DataType, StructType}
2324

@@ -226,4 +227,29 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
226227
protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""
227228

228229
override def simpleString: String = statePrefix + super.simpleString
230+
231+
override def generateTreeString(
232+
depth: Int, lastChildren: Seq[Boolean], builder: StringBuilder): StringBuilder = {
233+
if (depth > 0) {
234+
lastChildren.init.foreach { isLast =>
235+
val prefixFragment = if (isLast) " " else ": "
236+
builder.append(prefixFragment)
237+
}
238+
239+
val branch = if (lastChildren.last) "+- " else ":- "
240+
builder.append(branch)
241+
}
242+
243+
builder.append(simpleString)
244+
builder.append("\n")
245+
246+
val allSubqueries = expressions.flatMap(_.collect {case e: SubqueryExpression => e})
247+
val allChildren = children ++ allSubqueries.map(e => e.plan)
248+
if (allChildren.nonEmpty) {
249+
allChildren.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder))
250+
allChildren.last.generateTreeString(depth + 1, lastChildren :+ true, builder)
251+
}
252+
253+
builder
254+
}
229255
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ class AnalysisErrorSuite extends AnalysisTest {
113113

114114
val dateLit = Literal.create(null, DateType)
115115

116+
errorTest(
117+
"invalid scalar subquery",
118+
testRelation.select(
119+
(ScalarSubquery(testRelation.select('a, dateLit.as('b))) + Literal(1)).as('a)),
120+
"Scalar subquery can only have 1 column, but got 2" :: Nil)
121+
116122
errorTest(
117123
"single invalid type, single arg",
118124
testRelation.select(TestFunction(dateLit :: Nil, IntegerType :: Nil).as('a)),

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -884,6 +884,7 @@ class SQLContext private[sql](
884884
@transient
885885
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
886886
val batches = Seq(
887+
Batch("Subquery", Once, ConvertSubquery(self)),
887888
Batch("Add exchange", Once, EnsureRequirements(self)),
888889
Batch("Whole stage codegen", Once, CollapseCodegenStages(self))
889890
)

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.concurrent.duration._
2525

2626
import org.apache.spark.Logging
2727
import org.apache.spark.rdd.{RDD, RDDOperationScope}
28-
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
28+
import org.apache.spark.sql.{Row, SQLContext}
2929
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
3030
import org.apache.spark.sql.catalyst.expressions._
3131
import org.apache.spark.sql.catalyst.expressions.codegen._
@@ -127,31 +127,21 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
127127
doPrepare()
128128

129129
// collect all the subqueries and submit jobs to execute them in background
130-
val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]()
131-
val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e})
130+
val queryResults = ArrayBuffer[(SparkScalarSubquery, Future[Array[InternalRow]])]()
131+
val allSubqueries = expressions.flatMap(_.collect {case e: SparkScalarSubquery => e})
132132
allSubqueries.foreach { e =>
133-
val futureResult = scala.concurrent.future {
134-
val df = DataFrame(sqlContext, e.query)
135-
df.queryExecution.toRdd.collect()
133+
val futureResult = Future {
134+
e.plan.executeCollect()
136135
}(SparkPlan.subqueryExecutionContext)
137136
queryResults += e -> futureResult
138137
}
139138

140139
children.foreach(_.prepare())
141140

142-
val timeout: Duration = {
143-
val timeoutValue = sqlContext.conf.broadcastTimeout
144-
if (timeoutValue < 0) {
145-
Duration.Inf
146-
} else {
147-
timeoutValue.seconds
148-
}
149-
}
150-
151141
// fill in the result of subqueries
152142
queryResults.foreach {
153143
case (e, futureResult) =>
154-
val rows = Await.result(futureResult, timeout)
144+
val rows = Await.result(futureResult, Duration.Inf)
155145
if (rows.length > 1) {
156146
sys.error(s"Scalar subquery should return at most one row, but got ${rows.length}: " +
157147
s"${e.query.treeString}")

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,3 +343,18 @@ case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPl
343343

344344
protected override def doExecute(): RDD[InternalRow] = child.execute()
345345
}
346+
347+
/**
348+
* A plan as subquery.
349+
*
350+
* This is used to generate tree string for SparkScalarSubquery.
351+
*/
352+
case class Subquery(name: String, child: SparkPlan) extends UnaryNode {
353+
override def output: Seq[Attribute] = child.output
354+
override def outputPartitioning: Partitioning = child.outputPartitioning
355+
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
356+
357+
protected override def doExecute(): RDD[InternalRow] = {
358+
child.execute()
359+
}
360+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.sql.SQLContext
21+
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.catalyst.expressions.{ExprId, ScalarSubquery, SubqueryExpression}
23+
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
24+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
25+
import org.apache.spark.sql.catalyst.rules.Rule
26+
import org.apache.spark.sql.types.DataType
27+
28+
/**
29+
* A subquery that will return only one row and one column.
30+
*
31+
* This is the physical copy of ScalarSubquery to be used inside SparkPlan.
32+
*/
33+
case class SparkScalarSubquery(
34+
@transient executedPlan: SparkPlan,
35+
exprId: ExprId)
36+
extends SubqueryExpression with CodegenFallback {
37+
38+
override def query: LogicalPlan = throw new UnsupportedOperationException
39+
override def withNewPlan(plan: LogicalPlan): SubqueryExpression = {
40+
throw new UnsupportedOperationException
41+
}
42+
override def plan: SparkPlan = Subquery(simpleString, executedPlan)
43+
44+
override def dataType: DataType = executedPlan.schema.fields.head.dataType
45+
override def nullable: Boolean = true
46+
override def toString: String = s"subquery#${exprId.id}"
47+
48+
// the first column in first row from `query`.
49+
private var result: Any = null
50+
51+
def updateResult(v: Any): Unit = {
52+
result = v
53+
}
54+
55+
override def eval(input: InternalRow): Any = result
56+
}
57+
58+
/**
59+
* Convert the subquery from logical plan into executed plan.
60+
*/
61+
private[sql] case class ConvertSubquery(sqlContext: SQLContext) extends Rule[SparkPlan] {
62+
def apply(plan: SparkPlan): SparkPlan = {
63+
plan.transformAllExpressions {
64+
// Only scalar subquery will be executed separately, all others will be written as join.
65+
case subquery: ScalarSubquery =>
66+
val sparkPlan = sqlContext.planner.plan(ReturnAnswer(subquery.query)).next()
67+
val executedPlan = sqlContext.prepareForExecution.execute(sparkPlan)
68+
SparkScalarSubquery(executedPlan, subquery.exprId)
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)