Skip to content

Commit 0034172

Browse files
author
Davies Liu
committed
move wait subqueries into execute()/produce()
1 parent 7596173 commit 0034172

File tree

4 files changed

+49
-41
lines changed

4 files changed

+49
-41
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -115,44 +115,59 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
115115
final def execute(): RDD[InternalRow] = {
116116
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
117117
prepare()
118+
waitForSubqueries()
118119
doExecute()
119120
}
120121
}
121122

123+
// All the subquries and their Future of results.
124+
@transient private val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]()
125+
126+
/**
127+
* Collects all the subqueries and create a Future to take the first two rows of them.
128+
*/
129+
protected def prepareSubqueries(): Unit = {
130+
val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e})
131+
allSubqueries.foreach { e =>
132+
val futureResult = Future {
133+
// We only need the first row, try to take two rows so we can throw an exception if there
134+
// are more than one rows returned.
135+
e.executedPlan.executeTake(2)
136+
}(SparkPlan.subqueryExecutionContext)
137+
queryResults += e -> futureResult
138+
}
139+
}
140+
141+
/**
142+
* Waits for all the subquires to finish and updates the results.
143+
*/
144+
protected def waitForSubqueries(): Unit = {
145+
// fill in the result of subqueries
146+
queryResults.foreach {
147+
case (e, futureResult) =>
148+
val rows = Await.result(futureResult, Duration.Inf)
149+
if (rows.length > 1) {
150+
sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}")
151+
}
152+
if (rows.length == 1) {
153+
assert(rows(0).numFields == 1, "Analyzer should make sure this only returns one column")
154+
e.updateResult(rows(0).get(0, e.dataType))
155+
} else {
156+
// There is no rows returned, the result should be null.
157+
e.updateResult(null)
158+
}
159+
}
160+
queryResults.clear()
161+
}
162+
122163
/**
123164
* Prepare a SparkPlan for execution. It's idempotent.
124165
*/
125166
final def prepare(): Unit = {
126167
if (prepareCalled.compareAndSet(false, true)) {
127168
doPrepare()
128-
129-
// collect all the subqueries and submit jobs to execute them in background
130-
val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]()
131-
val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e})
132-
allSubqueries.foreach { e =>
133-
val futureResult = Future {
134-
e.plan.executeTake(2)
135-
}(SparkPlan.subqueryExecutionContext)
136-
queryResults += e -> futureResult
137-
}
138-
169+
prepareSubqueries()
139170
children.foreach(_.prepare())
140-
141-
// fill in the result of subqueries
142-
queryResults.foreach {
143-
case (e, futureResult) =>
144-
val rows = Await.result(futureResult, Duration.Inf)
145-
if (rows.length > 1) {
146-
sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}")
147-
}
148-
if (rows.length == 1) {
149-
assert(rows(0).numFields == 1, "Analyzer should make sure this only returns one column")
150-
e.updateResult(rows(0).get(0, e.dataType))
151-
} else {
152-
// There is no rows returned, the result should be null.
153-
e.updateResult(null)
154-
}
155-
}
156171
}
157172
}
158173

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,10 @@ trait CodegenSupport extends SparkPlan {
7373
/**
7474
* Returns Java source code to process the rows from upstream.
7575
*/
76-
def produce(ctx: CodegenContext, parent: CodegenSupport): String = {
76+
final def produce(ctx: CodegenContext, parent: CodegenSupport): String = {
7777
this.parent = parent
7878
ctx.freshNamePrefix = variablePrefix
79+
waitForSubqueries()
7980
doProduce(ctx)
8081
}
8182

@@ -101,7 +102,7 @@ trait CodegenSupport extends SparkPlan {
101102
/**
102103
* Consume the columns generated from current SparkPlan, call it's parent.
103104
*/
104-
def consume(ctx: CodegenContext, input: Seq[ExprCode], row: String = null): String = {
105+
final def consume(ctx: CodegenContext, input: Seq[ExprCode], row: String = null): String = {
105106
if (input != null) {
106107
assert(input.length == output.length)
107108
}

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,6 @@ case class Subquery(name: String, child: SparkPlan) extends UnaryNode {
355355
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
356356

357357
protected override def doExecute(): RDD[InternalRow] = {
358-
child.execute()
358+
throw new UnsupportedOperationException
359359
}
360360
}

sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.sql.SQLContext
21+
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
22+
import org.apache.spark.sql.catalyst.expressions.{ExprId, Literal, SubqueryExpression}
2123
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
22-
import org.apache.spark.sql.catalyst.expressions.{ExprId, SubqueryExpression}
2324
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
2425
import org.apache.spark.sql.catalyst.rules.Rule
25-
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
2626
import org.apache.spark.sql.types.DataType
2727

2828
/**
@@ -55,15 +55,7 @@ case class ScalarSubquery(
5555
override def eval(input: InternalRow): Any = result
5656

5757
override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
58-
val thisTerm = ctx.addReferenceObj("subquery", this)
59-
val isNull = ctx.freshName("isNull")
60-
ctx.addMutableState("boolean", isNull, s"$isNull = $thisTerm.eval(null) == null;")
61-
val value = ctx.freshName("value")
62-
ctx.addMutableState(ctx.javaType(dataType), value,
63-
s"$value = (${ctx.boxedType(dataType)}) $thisTerm.eval(null);")
64-
ev.isNull = isNull
65-
ev.value = value
66-
""
58+
Literal.create(result, dataType).genCode(ctx, ev)
6759
}
6860
}
6961

0 commit comments

Comments
 (0)