Skip to content

Commit

Permalink
[SPARK-33557][CORE][MESOS][TEST] Ensure the relationship between STOR…
Browse files Browse the repository at this point in the history
…AGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT and NETWORK_TIMEOUT

### What changes were proposed in this pull request?
As described in SPARK-33557, `HeartbeatReceiver` and `MesosCoarseGrainedSchedulerBackend` will always use `Network.NETWORK_TIMEOUT.defaultValueString` as value of `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` when we configure `NETWORK_TIMEOUT` without configure `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT`, this is different from the relationship described in `configuration.md`.

To fix this problem,the main change of this pr as follow:

- Remove the explicitly default value of `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT`

- Use actual value of `NETWORK_TIMEOUT` as `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` when `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` not configured in `HeartbeatReceiver` and `MesosCoarseGrainedSchedulerBackend`

### Why are the changes needed?
To ensure the relationship between `NETWORK_TIMEOUT` and  `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` as we described in `configuration.md`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass the Jenkins or GitHub Action

- Manual test configure `NETWORK_TIMEOUT` and `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` locally

Closes apache#30547 from LuciferYang/SPARK-33557.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
LuciferYang authored and HyukjinKwon committed Dec 2, 2020
1 parent 290aa02 commit 084d38b
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 3 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new HashMap[String, Long]

private val executorTimeoutMs = sc.conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
private val executorTimeoutMs = sc.conf.get(
config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s"))

private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ package object config {
.version("0.7.0")
.withAlternative("spark.storage.blockManagerSlaveTimeoutMs")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString(Network.NETWORK_TIMEOUT.defaultValueString)
.createOptional

private[spark] val STORAGE_CLEANUP_FILES_AFTER_EXECUTOR_EXIT =
ConfigBuilder("spark.storage.cleanupFilesAfterExecutorExit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ class ExecutorClassLoaderSuite
.setMaster("local")
.setAppName("executor-class-loader-test")
.set("spark.network.timeout", "11s")
.set("spark.network.timeoutInterval", "11s")
.set("spark.repl.class.outputDir", tempDir1.getAbsolutePath)
val sc = new SparkContext(conf)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkExceptio
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Network
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
Expand Down Expand Up @@ -651,7 +652,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.registerDriverWithShuffleService(
agent.hostname,
externalShufflePort,
sc.conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT),
sc.conf.get(
config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT
).getOrElse(Utils.timeStringAsMs(s"${sc.conf.get(Network.NETWORK_TIMEOUT)}s")),
sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL))
agent.shuffleRegistered = true
}
Expand Down

0 comments on commit 084d38b

Please sign in to comment.