Skip to content

Commit f6b1ebe

Browse files
committed
[SPARK-2952] Enable logging actor messages at DEBUG level
1 parent e45daf2 commit f6b1ebe

File tree

13 files changed

+94
-38
lines changed

13 files changed

+94
-38
lines changed

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import akka.actor.Actor
2121
import org.apache.spark.executor.TaskMetrics
2222
import org.apache.spark.storage.BlockManagerId
2323
import org.apache.spark.scheduler.TaskScheduler
24+
import org.apache.spark.util.ActorLogReceive
2425

2526
/**
2627
* A heartbeat from executors to the driver. This is a shared message used by several internal
@@ -36,8 +37,10 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
3637
/**
3738
* Lives in the driver to receive heartbeats from executors..
3839
*/
39-
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor {
40-
override def receive = {
40+
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
41+
extends Actor with ActorLogReceive with Logging {
42+
43+
override def receiveWithLogging = {
4144
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
4245
val response = HeartbeatResponse(
4346
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
3838

3939
/** Actor class for MapOutputTrackerMaster */
4040
private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf)
41-
extends Actor with Logging {
41+
extends Actor with ActorLogReceive with Logging {
4242
val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
4343

44-
def receive = {
44+
override def receiveWithLogging = {
4545
case GetMapOutputStatuses(shuffleId: Int) =>
4646
val hostPort = sender.path.address.hostPort
4747
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@ import org.apache.log4j.{Level, Logger}
2727
import org.apache.spark.{Logging, SecurityManager, SparkConf}
2828
import org.apache.spark.deploy.DeployMessages._
2929
import org.apache.spark.deploy.master.{DriverState, Master}
30-
import org.apache.spark.util.{AkkaUtils, Utils}
30+
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
3131

3232
/**
3333
* Proxy that relays messages to the driver.
3434
*/
35-
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
35+
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
36+
extends Actor with ActorLogReceive with Logging {
37+
3638
var masterActor: ActorSelection = _
3739
val timeout = AkkaUtils.askTimeout(conf)
3840

@@ -114,7 +116,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
114116
}
115117
}
116118

117-
override def receive = {
119+
override def receiveWithLogging = {
118120

119121
case SubmitDriverResponse(success, driverId, message) =>
120122
println(message)

core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
3030
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
3131
import org.apache.spark.deploy.DeployMessages._
3232
import org.apache.spark.deploy.master.Master
33-
import org.apache.spark.util.{Utils, AkkaUtils}
33+
import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils}
3434

3535
/**
3636
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
@@ -56,7 +56,7 @@ private[spark] class AppClient(
5656
var registered = false
5757
var activeMasterUrl: String = null
5858

59-
class ClientActor extends Actor with Logging {
59+
class ClientActor extends Actor with ActorLogReceive with Logging {
6060
var master: ActorSelection = null
6161
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
6262
var alreadyDead = false // To avoid calling listener.dead() multiple times
@@ -119,7 +119,7 @@ private[spark] class AppClient(
119119
.contains(remoteUrl.hostPort)
120120
}
121121

122-
override def receive = {
122+
override def receiveWithLogging = {
123123
case RegisteredApplication(appId_, masterUrl) =>
124124
appId = appId_
125125
registered = true

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
4242
import org.apache.spark.metrics.MetricsSystem
4343
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
4444
import org.apache.spark.ui.SparkUI
45-
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
45+
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
4646

4747
private[spark] class Master(
4848
host: String,
4949
port: Int,
5050
webUiPort: Int,
5151
val securityMgr: SecurityManager)
52-
extends Actor with Logging {
52+
extends Actor with ActorLogReceive with Logging {
5353

5454
import context.dispatcher // to use Akka's scheduler.schedule()
5555

@@ -167,7 +167,7 @@ private[spark] class Master(
167167
context.stop(leaderElectionAgent)
168168
}
169169

170-
override def receive = {
170+
override def receiveWithLogging = {
171171
case ElectedLeader => {
172172
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
173173
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._
3434
import org.apache.spark.deploy.master.{DriverState, Master}
3535
import org.apache.spark.deploy.worker.ui.WorkerWebUI
3636
import org.apache.spark.metrics.MetricsSystem
37-
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
37+
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
3838

3939
/**
4040
* @param masterUrls Each url should look like spark://host:port.
@@ -51,7 +51,7 @@ private[spark] class Worker(
5151
workDirPath: String = null,
5252
val conf: SparkConf,
5353
val securityMgr: SecurityManager)
54-
extends Actor with Logging {
54+
extends Actor with ActorLogReceive with Logging {
5555
import context.dispatcher
5656

5757
Utils.checkHost(host, "Expected hostname")
@@ -187,7 +187,7 @@ private[spark] class Worker(
187187
}
188188
}
189189

190-
override def receive = {
190+
override def receiveWithLogging = {
191191
case RegisteredWorker(masterUrl, masterWebUiUrl) =>
192192
logInfo("Successfully registered with master " + masterUrl)
193193
registered = true

core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, Di
2222

2323
import org.apache.spark.Logging
2424
import org.apache.spark.deploy.DeployMessages.SendHeartbeat
25+
import org.apache.spark.util.ActorLogReceive
2526

2627
/**
2728
* Actor which connects to a worker process and terminates the JVM if the connection is severed.
2829
* Provides fate sharing between a worker and its associated child processes.
2930
*/
30-
private[spark] class WorkerWatcher(workerUrl: String) extends Actor
31-
with Logging {
31+
private[spark] class WorkerWatcher(workerUrl: String)
32+
extends Actor with ActorLogReceive with Logging {
33+
3234
override def preStart() {
3335
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
3436

@@ -48,7 +50,7 @@ private[spark] class WorkerWatcher(workerUrl: String) extends Actor
4850

4951
def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
5052

51-
override def receive = {
53+
override def receiveWithLogging = {
5254
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
5355
logInfo(s"Successfully connected to $workerUrl")
5456

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,15 @@ import org.apache.spark.deploy.SparkHadoopUtil
3131
import org.apache.spark.deploy.worker.WorkerWatcher
3232
import org.apache.spark.scheduler.TaskDescription
3333
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
34-
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
34+
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
3535

3636
private[spark] class CoarseGrainedExecutorBackend(
3737
driverUrl: String,
3838
executorId: String,
3939
hostPort: String,
4040
cores: Int,
41-
sparkProperties: Seq[(String, String)]) extends Actor with ExecutorBackend with Logging {
41+
sparkProperties: Seq[(String, String)])
42+
extends Actor with ActorLogReceive with ExecutorBackend with Logging {
4243

4344
Utils.checkHostPort(hostPort, "Expected hostport")
4445

@@ -52,7 +53,7 @@ private[spark] class CoarseGrainedExecutorBackend(
5253
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
5354
}
5455

55-
override def receive = {
56+
override def receiveWithLogging = {
5657
case RegisteredExecutor =>
5758
logInfo("Successfully registered with driver")
5859
// Make this host instead of hostPort ?

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
3030
import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
3131
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
3232
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
33-
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
33+
import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
3434
import org.apache.spark.ui.JettyUtils
3535

3636
/**
@@ -61,7 +61,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
6161
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
6262
val createTime = System.currentTimeMillis()
6363

64-
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
64+
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {
65+
66+
override protected def log = CoarseGrainedSchedulerBackend.this.log
67+
6568
private val executorActor = new HashMap[String, ActorRef]
6669
private val executorAddress = new HashMap[String, Address]
6770
private val executorHost = new HashMap[String, String]
@@ -79,7 +82,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
7982
context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
8083
}
8184

82-
def receive = {
85+
def receiveWithLogging = {
8386
case RegisterExecutor(executorId, hostPort, cores) =>
8487
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
8588
if (executorActor.contains(executorId)) {

core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ import akka.actor.{Actor, ActorRef, Props}
2323

2424
import org.apache.spark.{Logging, SparkEnv, TaskState}
2525
import org.apache.spark.TaskState.TaskState
26-
import org.apache.spark.executor.{TaskMetrics, Executor, ExecutorBackend}
26+
import org.apache.spark.executor.{Executor, ExecutorBackend}
2727
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
28-
import org.apache.spark.storage.BlockManagerId
28+
import org.apache.spark.util.ActorLogReceive
2929

3030
private case class ReviveOffers()
3131

@@ -43,7 +43,7 @@ private case class StopExecutor()
4343
private[spark] class LocalActor(
4444
scheduler: TaskSchedulerImpl,
4545
executorBackend: LocalBackend,
46-
private val totalCores: Int) extends Actor with Logging {
46+
private val totalCores: Int) extends Actor with ActorLogReceive with Logging {
4747

4848
private var freeCores = totalCores
4949

@@ -53,7 +53,7 @@ private[spark] class LocalActor(
5353
val executor = new Executor(
5454
localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = true)
5555

56-
def receive = {
56+
override def receiveWithLogging = {
5757
case ReviveOffers =>
5858
reviveOffers()
5959

0 commit comments

Comments
 (0)