Skip to content

Commit e0d80a9

Browse files
committed
Use getTimeAsMs and getTimeAsSeconds and other minor fixes
1 parent 31dbe69 commit e0d80a9

File tree

3 files changed

+14
-12
lines changed

3 files changed

+14
-12
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ private[spark] object SparkConf extends Logging {
432432
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
433433
// Translate old value to a duration, with 10s wait time per try.
434434
translation = s => s"${s.toLong * 10}s")),
435-
"spark.rpc.num.retries" -> Seq(
435+
"spark.rpc.numRetries" -> Seq(
436436
AlternateConfig("spark.akka.num.retries", "1.4")),
437437
"spark.rpc.retry.wait" -> Seq(
438438
AlternateConfig("spark.akka.retry.wait", "1.4")),

core/src/main/scala/org/apache/spark/util/RpcUtils.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,21 @@ object RpcUtils {
3838

3939
/** Returns the configured number of times to retry connecting */
4040
def numRetries(conf: SparkConf): Int = {
41-
conf.getInt("spark.rpc.num.retries", 3)
41+
conf.getInt("spark.rpc.numRetries", 3)
4242
}
4343

4444
/** Returns the configured number of milliseconds to wait on each retry */
4545
def retryWaitMs(conf: SparkConf): Long = {
46-
conf.getLong("spark.rpc.retry.wait", 3000)
46+
conf.getTimeAsMs("spark.rpc.retry.wait", "3s")
4747
}
4848

49-
/** Returns the default Spark timeout to use for Rpc ask operations. */
49+
/** Returns the default Spark timeout to use for RPC ask operations. */
5050
def askTimeout(conf: SparkConf): FiniteDuration = {
51-
conf.getLong("spark.rpc.askTimeout", 30) seconds
51+
conf.getTimeAsSeconds("spark.rpc.askTimeout", "30s") seconds
5252
}
5353

54-
/** Returns the default Spark timeout to use for Rpc remote endpoint lookup. */
54+
/** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
5555
def lookupTimeout(conf: SparkConf): FiniteDuration = {
56-
conf.getLong("spark.rpc.lookupTimeout", 30) seconds
56+
conf.getTimeAsSeconds("spark.rpc.lookupTimeout", "30s") seconds
5757
}
5858
}

core/src/test/scala/org/apache/spark/SparkConfSuite.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ package org.apache.spark
1919

2020
import java.util.concurrent.{TimeUnit, Executors}
2121

22+
import scala.concurrent.duration._
23+
import scala.language.postfixOps
2224
import scala.util.{Try, Random}
2325

2426
import org.scalatest.FunSuite
2527
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
26-
import org.apache.spark.util.ResetSystemProperties
28+
import org.apache.spark.util.{RpcUtils, ResetSystemProperties}
2729
import com.esotericsoftware.kryo.Kryo
2830

2931
class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
@@ -231,16 +233,16 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
231233
assert(!conf.contains("spark.rpc.lookupTimeout"))
232234

233235
conf.set("spark.akka.num.retries", "1")
234-
assert(conf.get("spark.rpc.num.retries") === "1")
236+
assert(RpcUtils.numRetries(conf) === 1)
235237

236238
conf.set("spark.akka.retry.wait", "2")
237-
assert(conf.get("spark.rpc.retry.wait") === "2")
239+
assert(RpcUtils.retryWaitMs(conf) === 2L)
238240

239241
conf.set("spark.akka.askTimeout", "3")
240-
assert(conf.get("spark.rpc.askTimeout") === "3")
242+
assert(RpcUtils.askTimeout(conf) === (3 seconds))
241243

242244
conf.set("spark.akka.lookupTimeout", "4")
243-
assert(conf.get("spark.rpc.lookupTimeout") === "4")
245+
assert(RpcUtils.lookupTimeout(conf) === (4 seconds))
244246
}
245247
}
246248

0 commit comments

Comments
 (0)