Skip to content

Commit a294569

Browse files
committed
[SPARK-6980] Added creation of RpcTimeout with Seq of property keys
1 parent 23d2f26 commit a294569

File tree

2 files changed

+25
-14
lines changed

2 files changed

+25
-14
lines changed

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,4 +250,24 @@ object RpcTimeout {
250250
val timeout = { conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds }
251251
new RpcTimeout(timeout, messagePrefix + timeoutProp)
252252
}
253+
254+
/**
255+
* Lookup prioritized list of timeout properties in the configuration
256+
* and create a RpcTimeout with the first set property key in the
257+
* description.
258+
* Uses the given default value if property is not set
259+
* @param conf configuration properties containing the timeout
260+
* @param timeoutPropList prioritized list of property keys for the timeout in seconds
261+
* @param defaultValue default timeout value in seconds if no properties found
262+
*/
263+
def apply(conf: SparkConf, timeoutPropList: Seq[String], defaultValue: String): RpcTimeout = {
264+
require(timeoutPropList.nonEmpty)
265+
266+
// Find the first set property or use the default value with the first property
267+
val foundProp = timeoutPropList.view.map(x => (x, conf.getOption(x))).filter(_._2.isDefined).
268+
map(y => (y._1, y._2.get)).headOption.getOrElse(timeoutPropList.head, defaultValue)
269+
270+
val timeout = { Utils.timeStringAsSeconds(foundProp._2) seconds }
271+
new RpcTimeout(timeout, messagePrefix + foundProp._1)
272+
}
253273
}

core/src/main/scala/org/apache/spark/util/RpcUtils.scala

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.util
1919

2020
import scala.language.postfixOps
21+
import scala.concurrent.duration._
2122

2223
import org.apache.spark.{SparkEnv, SparkConf}
2324
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
@@ -47,23 +48,13 @@ object RpcUtils {
4748

4849
/** Returns the default Spark timeout to use for RPC ask operations. */
4950
def askTimeout(conf: SparkConf): RpcTimeout = {
50-
try {
51-
RpcTimeout(conf, "spark.rpc.askTimeout")
52-
}
53-
catch {
54-
case _: Throwable =>
55-
RpcTimeout(conf, "spark.network.timeout", "120s")
56-
}
51+
RpcTimeout(conf, Seq("spark.rpc.askTimeout",
52+
"spark.network.timeout"), "120s")
5753
}
5854

5955
/** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
6056
def lookupTimeout(conf: SparkConf): RpcTimeout = {
61-
try {
62-
RpcTimeout(conf, "spark.rpc.lookupTimeout")
63-
}
64-
catch {
65-
case _: Throwable =>
66-
RpcTimeout(conf, "spark.network.timeout", "120s")
67-
}
57+
RpcTimeout(conf, Seq("spark.rpc.lookupTimeout",
58+
"spark.network.timeout"), "120s")
6859
}
6960
}

0 commit comments

Comments
 (0)