Skip to content

Commit aa7bbc1

Browse files
BryanCutlersquito
authored andcommitted
[SPARK-6980] [CORE] Akka timeout exceptions indicate which conf controls them (RPC Layer)
Latest changes after refactoring to the RPC layer. I rebased against trunk to make sure to get any recent changes since it had been a while. I wasn't crazy about the name `ConfigureTimeout` and `RpcTimeout` seemed to fit better, but I'm open to suggestions! I ran most of the tests and they pass, but others would get stuck with "WARN TaskSchedulerImpl: Initial job has not accepted any resources". I think its just my machine, so I'd though I would push what I have anyway. Still left to do: * I only added a couple unit tests so far, there are probably some more cases to test * Make sure all uses require a `RpcTimeout` * Right now, both the `ask` and `Await.result` use the same timeout, should we differentiate between these in the TimeoutException message? * I wrapped `Await.result` in `RpcTimeout`, should we also wrap `Await.ready`? * Proper scoping of classes and methods hardmettle, feel free to help out with any of these! Author: Bryan Cutler <bjcutler@us.ibm.com> Author: Harsh Gupta <harsh@Harshs-MacBook-Pro.local> Author: BryanCutler <cutlerb@gmail.com> Closes apache#6205 from BryanCutler/configTimeout-6980 and squashes the following commits: 46c8d48 [Bryan Cutler] [SPARK-6980] Changed RpcEnvSuite test to never reply instead of just sleeping, to avoid possible sync issues 06afa53 [Bryan Cutler] [SPARK-6980] RpcTimeout class extends Serializable, was causing error in MasterSuite 7bb70f1 [Bryan Cutler] Merge branch 'master' into configTimeout-6980 dbd5f73 [Bryan Cutler] [SPARK-6980] Changed RpcUtils askRpcTimeout and lookupRpcTimeout scope to private[spark] and improved deprecation warning msg 4e89c75 [Bryan Cutler] [SPARK-6980] Missed one usage of deprecated RpcUtils.askTimeout in YarnSchedulerBackend although it is not being used, and fixed SparkConfSuite UT to not use deprecated RpcUtils functions 6a1c50d [Bryan Cutler] [SPARK-6980] Minor cleanup of test case 7f4d78e [Bryan Cutler] [SPARK-6980] Fixed scala style checks 287059a [Bryan Cutler] [SPARK-6980] Removed extra import in AkkaRpcEnvSuite 3d8b1ff [Bryan Cutler] [SPARK-6980] Cleaned up imports in AkkaRpcEnvSuite 3a168c7 [Bryan Cutler] [SPARK-6980] Rewrote Akka RpcTimeout UTs in RpcEnvSuite 7636189 [Bryan Cutler] [SPARK-6980] Fixed call to askWithReply in DAGScheduler to use RpcTimeout - this was being compiled by auto-tupling and changing the message type of BlockManagerHeartbeat be11c4e [Bryan Cutler] Merge branch 'master' into configTimeout-6980 039afed [Bryan Cutler] [SPARK-6980] Corrected import organization 218aa50 [Bryan Cutler] [SPARK-6980] Corrected issues from feedback fadaf6f [Bryan Cutler] [SPARK-6980] Put back in deprecated RpcUtils askTimeout and lookupTimout to fix MiMa errors fa6ed82 [Bryan Cutler] [SPARK-6980] Had to increase timeout on positive test case because a processor slowdown could trigger an Future TimeoutException b05d449 [Bryan Cutler] [SPARK-6980] Changed constructor to use val duration instead of getter function, changed name of string property from conf to timeoutProp for consistency c6cfd33 [Bryan Cutler] [SPARK-6980] Changed UT ask message timeout to explicitly intercept a SparkException 1394de6 [Bryan Cutler] [SPARK-6980] Moved MessagePrefix to createRpcTimeoutException directly 1517721 [Bryan Cutler] [SPARK-6980] RpcTimeout object scope should be private[spark] 2206b4d [Bryan Cutler] [SPARK-6980] Added unit test for ask then immediat awaitReply 1b9beab [Bryan Cutler] [SPARK-6980] Cleaned up import ordering 08f5afc [Bryan Cutler] [SPARK-6980] Added UT for constructing RpcTimeout with default value d3754d1 [Bryan Cutler] [SPARK-6980] Added akkaConf to prevent dead letter logging 995d196 [Bryan Cutler] [SPARK-6980] Cleaned up import ordering, comments, spacing from PR feedback 7774d56 [Bryan Cutler] [SPARK-6980] Cleaned up UT imports 4351c48 [Bryan Cutler] [SPARK-6980] Added UT for addMessageIfTimeout, cleaned up UTs 1607a5f [Bryan Cutler] [SPARK-6980] Changed addMessageIfTimeout to PartialFunction, cleanup from PR comments 2f94095 [Bryan Cutler] [SPARK-6980] Added addMessageIfTimeout for when a Future is completed with TimeoutException 235919b [Bryan Cutler] [SPARK-6980] Resolved conflicts after master merge c07d05c [Bryan Cutler] Merge branch 'master' into configTimeout-6980-tmp b7fb99f [BryanCutler] Merge pull request #2 from hardmettle/configTimeoutUpdates_6980 4be3a8d [Harsh Gupta] Modifying loop condition to find property match 0ee5642 [Harsh Gupta] Changing the loop condition to halt at the first match in the property list for RpcEnv exception catch f74064d [Harsh Gupta] Retrieving properties from property list using iterator and while loop instead of chained functions a294569 [Bryan Cutler] [SPARK-6980] Added creation of RpcTimeout with Seq of property keys 23d2f26 [Bryan Cutler] [SPARK-6980] Fixed await result not being handled by RpcTimeout 49f9f04 [Bryan Cutler] [SPARK-6980] Minor cleanup and scala style fix 5b59a44 [Bryan Cutler] [SPARK-6980] Added some RpcTimeout unit tests 78a2c0a [Bryan Cutler] [SPARK-6980] Using RpcTimeout.awaitResult for future in AppClient now 97523e0 [Bryan Cutler] [SPARK-6980] Akka ask timeout description refactored to RPC layer
1 parent d983819 commit aa7bbc1

File tree

11 files changed

+258
-47
lines changed

11 files changed

+258
-47
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class WorkerWebUI(
3838
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
3939
with Logging {
4040

41-
private[ui] val timeout = RpcUtils.askTimeout(worker.conf)
41+
private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf)
4242

4343
initialize()
4444

core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package org.apache.spark.rpc
1919

20-
import scala.concurrent.{Await, Future}
21-
import scala.concurrent.duration.FiniteDuration
20+
import scala.concurrent.Future
2221
import scala.reflect.ClassTag
2322

2423
import org.apache.spark.util.RpcUtils
@@ -32,7 +31,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
3231

3332
private[this] val maxRetries = RpcUtils.numRetries(conf)
3433
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
35-
private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)
34+
private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
3635

3736
/**
3837
* return the address for the [[RpcEndpointRef]]
@@ -52,7 +51,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
5251
*
5352
* This method only sends the message once and never retries.
5453
*/
55-
def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T]
54+
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
5655

5756
/**
5857
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
@@ -91,15 +90,15 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
9190
* @tparam T type of the reply message
9291
* @return the reply message from the corresponding [[RpcEndpoint]]
9392
*/
94-
def askWithRetry[T: ClassTag](message: Any, timeout: FiniteDuration): T = {
93+
def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
9594
// TODO: Consider removing multiple attempts
9695
var attempts = 0
9796
var lastException: Exception = null
9897
while (attempts < maxRetries) {
9998
attempts += 1
10099
try {
101100
val future = ask[T](message, timeout)
102-
val result = Await.result(future, timeout)
101+
val result = timeout.awaitResult(future)
103102
if (result == null) {
104103
throw new SparkException("Actor returned null")
105104
}
@@ -110,10 +109,14 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
110109
lastException = e
111110
logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
112111
}
113-
Thread.sleep(retryWaitMs)
112+
113+
if (attempts < maxRetries) {
114+
Thread.sleep(retryWaitMs)
115+
}
114116
}
115117

116118
throw new SparkException(
117119
s"Error sending message [message = $message]", lastException)
118120
}
121+
119122
}

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 109 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
package org.apache.spark.rpc
1919

2020
import java.net.URI
21+
import java.util.concurrent.TimeoutException
2122

22-
import scala.concurrent.{Await, Future}
23+
import scala.concurrent.{Awaitable, Await, Future}
24+
import scala.concurrent.duration._
2325
import scala.language.postfixOps
2426

2527
import org.apache.spark.{SecurityManager, SparkConf}
@@ -66,7 +68,7 @@ private[spark] object RpcEnv {
6668
*/
6769
private[spark] abstract class RpcEnv(conf: SparkConf) {
6870

69-
private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf)
71+
private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)
7072

7173
/**
7274
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
@@ -94,7 +96,7 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
9496
* Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action.
9597
*/
9698
def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
97-
Await.result(asyncSetupEndpointRefByURI(uri), defaultLookupTimeout)
99+
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
98100
}
99101

100102
/**
@@ -184,3 +186,107 @@ private[spark] object RpcAddress {
184186
RpcAddress(host, port)
185187
}
186188
}
189+
190+
191+
/**
192+
* An exception thrown if RpcTimeout modifies a [[TimeoutException]].
193+
*/
194+
private[rpc] class RpcTimeoutException(message: String, cause: TimeoutException)
195+
extends TimeoutException(message) { initCause(cause) }
196+
197+
198+
/**
199+
* Associates a timeout with a description so that a when a TimeoutException occurs, additional
200+
* context about the timeout can be amended to the exception message.
201+
* @param duration timeout duration in seconds
202+
* @param timeoutProp the configuration property that controls this timeout
203+
*/
204+
private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: String)
205+
extends Serializable {
206+
207+
/** Amends the standard message of TimeoutException to include the description */
208+
private def createRpcTimeoutException(te: TimeoutException): RpcTimeoutException = {
209+
new RpcTimeoutException(te.getMessage() + ". This timeout is controlled by " + timeoutProp, te)
210+
}
211+
212+
/**
213+
* PartialFunction to match a TimeoutException and add the timeout description to the message
214+
*
215+
* @note This can be used in the recover callback of a Future to add to a TimeoutException
216+
* Example:
217+
* val timeout = new RpcTimeout(5 millis, "short timeout")
218+
* Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout)
219+
*/
220+
def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
221+
// The exception has already been converted to a RpcTimeoutException so just raise it
222+
case rte: RpcTimeoutException => throw rte
223+
// Any other TimeoutException get converted to a RpcTimeoutException with modified message
224+
case te: TimeoutException => throw createRpcTimeoutException(te)
225+
}
226+
227+
/**
228+
* Wait for the completed result and return it. If the result is not available within this
229+
* timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
230+
* @param awaitable the `Awaitable` to be awaited
231+
* @throws RpcTimeoutException if after waiting for the specified time `awaitable`
232+
* is still not ready
233+
*/
234+
def awaitResult[T](awaitable: Awaitable[T]): T = {
235+
try {
236+
Await.result(awaitable, duration)
237+
} catch addMessageIfTimeout
238+
}
239+
}
240+
241+
242+
private[spark] object RpcTimeout {
243+
244+
/**
245+
* Lookup the timeout property in the configuration and create
246+
* a RpcTimeout with the property key in the description.
247+
* @param conf configuration properties containing the timeout
248+
* @param timeoutProp property key for the timeout in seconds
249+
* @throws NoSuchElementException if property is not set
250+
*/
251+
def apply(conf: SparkConf, timeoutProp: String): RpcTimeout = {
252+
val timeout = { conf.getTimeAsSeconds(timeoutProp) seconds }
253+
new RpcTimeout(timeout, timeoutProp)
254+
}
255+
256+
/**
257+
* Lookup the timeout property in the configuration and create
258+
* a RpcTimeout with the property key in the description.
259+
* Uses the given default value if property is not set
260+
* @param conf configuration properties containing the timeout
261+
* @param timeoutProp property key for the timeout in seconds
262+
* @param defaultValue default timeout value in seconds if property not found
263+
*/
264+
def apply(conf: SparkConf, timeoutProp: String, defaultValue: String): RpcTimeout = {
265+
val timeout = { conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds }
266+
new RpcTimeout(timeout, timeoutProp)
267+
}
268+
269+
/**
270+
* Lookup prioritized list of timeout properties in the configuration
271+
* and create a RpcTimeout with the first set property key in the
272+
* description.
273+
* Uses the given default value if property is not set
274+
* @param conf configuration properties containing the timeout
275+
* @param timeoutPropList prioritized list of property keys for the timeout in seconds
276+
* @param defaultValue default timeout value in seconds if no properties found
277+
*/
278+
def apply(conf: SparkConf, timeoutPropList: Seq[String], defaultValue: String): RpcTimeout = {
279+
require(timeoutPropList.nonEmpty)
280+
281+
// Find the first set property or use the default value with the first property
282+
val itr = timeoutPropList.iterator
283+
var foundProp: Option[(String, String)] = None
284+
while (itr.hasNext && foundProp.isEmpty){
285+
val propKey = itr.next()
286+
conf.getOption(propKey).foreach { prop => foundProp = Some(propKey, prop) }
287+
}
288+
val finalProp = foundProp.getOrElse(timeoutPropList.head, defaultValue)
289+
val timeout = { Utils.timeStringAsSeconds(finalProp._2) seconds }
290+
new RpcTimeout(timeout, finalProp._1)
291+
}
292+
}

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.rpc.akka
2020
import java.util.concurrent.ConcurrentHashMap
2121

2222
import scala.concurrent.Future
23-
import scala.concurrent.duration._
2423
import scala.language.postfixOps
2524
import scala.reflect.ClassTag
2625
import scala.util.control.NonFatal
@@ -214,8 +213,11 @@ private[spark] class AkkaRpcEnv private[akka] (
214213

215214
override def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {
216215
import actorSystem.dispatcher
217-
actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout).
218-
map(new AkkaRpcEndpointRef(defaultAddress, _, conf))
216+
actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout.duration).
217+
map(new AkkaRpcEndpointRef(defaultAddress, _, conf)).
218+
// this is just in case there is a timeout from creating the future in resolveOne, we want the
219+
// exception to indicate the conf that determines the timeout
220+
recover(defaultLookupTimeout.addMessageIfTimeout)
219221
}
220222

221223
override def uriOf(systemName: String, address: RpcAddress, endpointName: String): String = {
@@ -295,8 +297,8 @@ private[akka] class AkkaRpcEndpointRef(
295297
actorRef ! AkkaMessage(message, false)
296298
}
297299

298-
override def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T] = {
299-
actorRef.ask(AkkaMessage(message, true))(timeout).flatMap {
300+
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
301+
actorRef.ask(AkkaMessage(message, true))(timeout.duration).flatMap {
300302
// The function will run in the calling thread, so it should be short and never block.
301303
case msg @ AkkaMessage(message, reply) =>
302304
if (reply) {
@@ -307,7 +309,8 @@ private[akka] class AkkaRpcEndpointRef(
307309
}
308310
case AkkaFailure(e) =>
309311
Future.failed(e)
310-
}(ThreadUtils.sameThread).mapTo[T]
312+
}(ThreadUtils.sameThread).mapTo[T].
313+
recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
311314
}
312315

313316
override def toString: String = s"${getClass.getSimpleName}($actorRef)"

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.broadcast.Broadcast
3535
import org.apache.spark.executor.TaskMetrics
3636
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
3737
import org.apache.spark.rdd.RDD
38+
import org.apache.spark.rpc.RpcTimeout
3839
import org.apache.spark.storage._
3940
import org.apache.spark.unsafe.memory.TaskMemoryManager
4041
import org.apache.spark.util._
@@ -188,7 +189,7 @@ class DAGScheduler(
188189
blockManagerId: BlockManagerId): Boolean = {
189190
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
190191
blockManagerMaster.driverEndpoint.askWithRetry[Boolean](
191-
BlockManagerHeartbeat(blockManagerId), 600 seconds)
192+
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
192193
}
193194

194195
// Called by TaskScheduler when an executor fails.

core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ private[spark] abstract class YarnSchedulerBackend(
4646
private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint(
4747
YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv))
4848

49-
private implicit val askTimeout = RpcUtils.askTimeout(sc.conf)
49+
private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
5050

5151
/**
5252
* Request executors from the ApplicationMaster by specifying the total number desired.

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class BlockManagerMaster(
3333
isDriver: Boolean)
3434
extends Logging {
3535

36-
val timeout = RpcUtils.askTimeout(conf)
36+
val timeout = RpcUtils.askRpcTimeout(conf)
3737

3838
/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
3939
def removeExecutor(execId: String) {
@@ -106,7 +106,7 @@ class BlockManagerMaster(
106106
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}", e)
107107
}(ThreadUtils.sameThread)
108108
if (blocking) {
109-
Await.result(future, timeout)
109+
timeout.awaitResult(future)
110110
}
111111
}
112112

@@ -118,7 +118,7 @@ class BlockManagerMaster(
118118
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}", e)
119119
}(ThreadUtils.sameThread)
120120
if (blocking) {
121-
Await.result(future, timeout)
121+
timeout.awaitResult(future)
122122
}
123123
}
124124

@@ -132,7 +132,7 @@ class BlockManagerMaster(
132132
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}", e)
133133
}(ThreadUtils.sameThread)
134134
if (blocking) {
135-
Await.result(future, timeout)
135+
timeout.awaitResult(future)
136136
}
137137
}
138138

@@ -176,8 +176,8 @@ class BlockManagerMaster(
176176
CanBuildFrom[Iterable[Future[Option[BlockStatus]]],
177177
Option[BlockStatus],
178178
Iterable[Option[BlockStatus]]]]
179-
val blockStatus = Await.result(
180-
Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread), timeout)
179+
val blockStatus = timeout.awaitResult(
180+
Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread))
181181
if (blockStatus == null) {
182182
throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
183183
}
@@ -199,7 +199,7 @@ class BlockManagerMaster(
199199
askSlaves: Boolean): Seq[BlockId] = {
200200
val msg = GetMatchingBlockIds(filter, askSlaves)
201201
val future = driverEndpoint.askWithRetry[Future[Seq[BlockId]]](msg)
202-
Await.result(future, timeout)
202+
timeout.awaitResult(future)
203203
}
204204

205205
/**

0 commit comments

Comments
 (0)