Skip to content

Commit 79c6689

Browse files
MaxGekkhvanhovell
authored andcommitted
[SPARK-24757][SQL] Improving the error message for broadcast timeouts
## What changes were proposed in this pull request? In the PR, I propose to provide a tip to user how to resolve the issue of timeout expiration for broadcast joins. In particular, they can increase the timeout via **spark.sql.broadcastTimeout** or disable the broadcast at all by setting **spark.sql.autoBroadcastJoinThreshold** to `-1`. ## How was this patch tested? It tested manually from `spark-shell`: ``` scala> spark.conf.set("spark.sql.broadcastTimeout", 1) scala> val df = spark.range(100).join(spark.range(15).as[Long].map { x => Thread.sleep(5000) x }).where("id = value") scala> df.count() ``` ``` org.apache.spark.SparkException: Could not execute broadcast in 1 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1 at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:150) ``` Author: Maxim Gekk <maxim.gekk@databricks.com> Closes #21727 from MaxGekk/broadcast-timeout-error.
1 parent 044b33b commit 79c6689

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.exchange
1919

20+
import java.util.concurrent.TimeoutException
21+
2022
import scala.concurrent.{ExecutionContext, Future}
2123
import scala.concurrent.duration._
2224
import scala.util.control.NonFatal
@@ -140,7 +142,16 @@ case class BroadcastExchangeExec(
140142
}
141143

142144
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
143-
ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
145+
try {
146+
ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
147+
} catch {
148+
case ex: TimeoutException =>
149+
logError(s"Could not execute broadcast in ${timeout.toSeconds} secs.", ex)
150+
throw new SparkException(s"Could not execute broadcast in ${timeout.toSeconds} secs. " +
151+
s"You can increase the timeout for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} or " +
152+
s"disable broadcast join by setting ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1",
153+
ex)
154+
}
144155
}
145156
}
146157

0 commit comments

Comments
 (0)