Skip to content

Commit 0d0b3f3

Browse files
committed
Improving the error message for broadcast timeouts
I added a recommendation for increasing broadcast timeout. This sentence is added to existing error message: ``` You can increase the timeout for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} ``` Author: Maxim Gekk <maxim.gekk@databricks.com> Closes #2801 from MaxGekk/broadcast-error-message.
1 parent 044b33b commit 0d0b3f3

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717

1818
package org.apache.spark.sql.execution.exchange
1919

20+
import java.util.concurrent.TimeoutException
21+
2022
import scala.concurrent.{ExecutionContext, Future}
2123
import scala.concurrent.duration._
2224
import scala.util.control.NonFatal
2325

2426
import org.apache.spark.{broadcast, SparkException}
27+
2528
import org.apache.spark.launcher.SparkLauncher
2629
import org.apache.spark.rdd.RDD
2730
import org.apache.spark.sql.catalyst.InternalRow
@@ -140,7 +143,16 @@ case class BroadcastExchangeExec(
140143
}
141144

142145
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
143-
ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
146+
try {
147+
ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
148+
} catch {
149+
case ex: TimeoutException =>
150+
logError(s"Could not execute broadcast in ${timeout.toSeconds} secs.", ex)
151+
throw new SparkException(s"Could not execute broadcast in ${timeout.toSeconds} secs. " +
152+
s"You can increase the timeout for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} or " +
153+
s"disable broadcast join by setting ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1",
154+
ex)
155+
}
144156
}
145157
}
146158

0 commit comments

Comments
 (0)