Skip to content

Commit

Permalink
[SPARK-21321][SPARK CORE] Spark very verbose on shutdown
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

The current code is very verbose on shutdown.

The changes I propose is to change the log level when the driver is shutting down and the RPC connections are closed (RpcEnvStoppedException).

## How was this patch tested?

Tested with word count(deploy-mode = cluster, master = yarn, num-executors = 4) with 300GB of data.

Author: John Lee <jlee2@yahoo-inc.com>

Closes #18547 from yoonlee95/SPARK-21321.

(cherry picked from commit 0e07a29)
Signed-off-by: Tom Graves <tgraves@yahoo-inc.com>
  • Loading branch information
John Lee authored and Tom Graves committed Jul 17, 2017
1 parent 8e85ce6 commit 0ef98fd
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,11 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
val iter = endpoints.keySet().iterator()
while (iter.hasNext) {
val name = iter.next
postMessage(name, message, (e) => logWarning(s"Message $message dropped. ${e.getMessage}"))
}
postMessage(name, message, (e) => { e match {
case e: RpcEnvStoppedException => logDebug (s"Message $message dropped. ${e.getMessage}")
case e: Throwable => logWarning(s"Message $message dropped. ${e.getMessage}")
}}
)}
}

/** Posts a message sent by a remote endpoint. */
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,12 @@ private[netty] class Inbox(
try action catch {
case NonFatal(e) =>
try endpoint.onError(e) catch {
case NonFatal(ee) => logError(s"Ignoring error", ee)
case NonFatal(ee) =>
if (stopped) {
logDebug("Ignoring error", ee)
} else {
logError("Ignoring error", ee)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private[netty] class NettyRpcEnv(
try {
dispatcher.postOneWayMessage(message)
} catch {
case e: RpcEnvStoppedException => logWarning(e.getMessage)
case e: RpcEnvStoppedException => logDebug(e.getMessage)
}
} else {
// Message to a remote RPC endpoint.
Expand All @@ -203,7 +203,10 @@ private[netty] class NettyRpcEnv(

def onFailure(e: Throwable): Unit = {
if (!promise.tryFailure(e)) {
logWarning(s"Ignored failure: $e")
e match {
case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")
case _ => logWarning(s"Ignored failure: $e")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends Outbo

override def onFailure(e: Throwable): Unit = {
e match {
case e1: RpcEnvStoppedException => logWarning(e1.getMessage)
case e1: RpcEnvStoppedException => logDebug(e1.getMessage)
case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
def post(event: SparkListenerEvent): Unit = {
if (stopped.get) {
// Drop further events to make `listenerThread` exit ASAP
logError(s"$name has already stopped! Dropping event $event")
logDebug(s"$name has already stopped! Dropping event $event")
return
}
val eventAdded = eventQueue.offer(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler.cluster

import java.util.concurrent.atomic.{AtomicBoolean}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
Expand All @@ -40,6 +42,8 @@ private[spark] abstract class YarnSchedulerBackend(
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {

private val stopped = new AtomicBoolean(false)

override val minRegisteredRatio =
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
0.8
Expand Down Expand Up @@ -93,6 +97,7 @@ private[spark] abstract class YarnSchedulerBackend(
requestTotalExecutors(0, 0, Map.empty)
super.stop()
} finally {
stopped.set(true)
services.stop()
}
}
Expand Down Expand Up @@ -206,8 +211,10 @@ private[spark] abstract class YarnSchedulerBackend(
*/
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
addressToExecutorId.get(rpcAddress).foreach { executorId =>
if (disableExecutor(executorId)) {
yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
if (!stopped.get) {
if (disableExecutor(executorId)) {
yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
}
}
}
}
Expand Down

0 comments on commit 0ef98fd

Please sign in to comment.