Skip to content

Commit b7fb99f

Browse files
committed
Merge pull request #2 from hardmettle/configTimeoutUpdates_6980
[SPARK-6980] [CORE] [WIP] Creating wrapper for Akka timeout exceptions to get better information using conf (RPC Layer)
2 parents a294569 + 4be3a8d commit b7fb99f

File tree

1 file changed

+12
-5
lines changed

1 file changed

+12
-5
lines changed

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -264,10 +264,17 @@ object RpcTimeout {
264264
require(timeoutPropList.nonEmpty)
265265

266266
// Find the first set property or use the default value with the first property
267-
val foundProp = timeoutPropList.view.map(x => (x, conf.getOption(x))).filter(_._2.isDefined).
268-
map(y => (y._1, y._2.get)).headOption.getOrElse(timeoutPropList.head, defaultValue)
269-
270-
val timeout = { Utils.timeStringAsSeconds(foundProp._2) seconds }
271-
new RpcTimeout(timeout, messagePrefix + foundProp._1)
267+
val itr = timeoutPropList.iterator
268+
var foundProp = None: Option[(String, String)]
269+
while (itr.hasNext && foundProp.isEmpty){
270+
val propKey = itr.next()
271+
conf.getOption(propKey) match {
272+
case Some(prop) => foundProp = Some(propKey,prop)
273+
case None =>
274+
}
275+
}
276+
val finalProp = foundProp.getOrElse(timeoutPropList.head, defaultValue)
277+
val timeout = { Utils.timeStringAsSeconds(finalProp._2) seconds }
278+
new RpcTimeout(timeout, messagePrefix + finalProp._1)
272279
}
273280
}

0 commit comments

Comments
 (0)