Skip to content

Commit 016c36c

Browse files
author
Davies Liu
committed
use broadcastTimeout
1 parent 236ac88 commit 016c36c

File tree

1 file changed

+11
-2
lines changed

1 file changed

+11
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean
2121

2222
import scala.collection.mutable.ArrayBuffer
2323
import scala.concurrent.{Await, ExecutionContext, Future}
24-
import scala.concurrent.duration.Duration
24+
import scala.concurrent.duration._
2525

2626
import org.apache.spark.Logging
2727
import org.apache.spark.rdd.{RDD, RDDOperationScope}
@@ -139,10 +139,19 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
139139

140140
children.foreach(_.prepare())
141141

142+
val timeout: Duration = {
143+
val timeoutValue = sqlContext.conf.broadcastTimeout
144+
if (timeoutValue < 0) {
145+
Duration.Inf
146+
} else {
147+
timeoutValue.seconds
148+
}
149+
}
150+
142151
// fill in the result of subqueries
143152
queryResults.foreach {
144153
case (e, futureResult) =>
145-
val rows = Await.result(futureResult, Duration.Inf)
154+
val rows = Await.result(futureResult, timeout)
146155
if (rows.length > 1) {
147156
sys.error(s"Scalar subquery should return at most one row, but got ${rows.length}: " +
148157
s"${e.query.treeString}")

0 commit comments

Comments
 (0)