Skip to content

Commit

Permalink
Add Master and Worker instrumentation support
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Jul 24, 2013
1 parent 503acd3 commit 9dec8c7
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 4 deletions.
9 changes: 8 additions & 1 deletion core/src/main/scala/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import spark.util.AkkaUtils
import ui.MasterWebUI


private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor
with Logging with MasterInstrumentation {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000

Expand Down Expand Up @@ -73,6 +74,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())

initialize(this)
}

override def postStop() {
Expand Down Expand Up @@ -316,6 +319,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
removeWorker(worker)
}
}

override def postStop() {
uninitialize()
}
}

private[spark] object Master {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package spark.deploy.master

import com.codahale.metrics.{Gauge, JmxReporter, MetricRegistry}

import spark.metrics.AbstractInstrumentation

private[spark] trait MasterInstrumentation extends AbstractInstrumentation {
var masterInst: Option[Master] = None
val metricRegistry = new MetricRegistry()

override def registryHandler = metricRegistry

override def instance = "master"

def initialize(master: Master) {
masterInst = Some(master)

// Register and start all the sinks
registerSinks
}

def uninitialize() {
unregisterSinks
}

// Gauge for worker numbers in cluster
metricRegistry.register(MetricRegistry.name(classOf[Master], "workers", "number"),
new Gauge[Int] {
override def getValue: Int = masterInst.map(_.workers.size).getOrElse(0)
})

// Gauge for application numbers in cluster
metricRegistry.register(MetricRegistry.name(classOf[Master], "apps", "number"),
new Gauge[Int] {
override def getValue: Int = masterInst.map(_.apps.size).getOrElse(0)
})

// Gauge for waiting application numbers in cluster
metricRegistry.register(MetricRegistry.name(classOf[Master], "waiting_apps", "number"),
new Gauge[Int] {
override def getValue: Int = masterInst.map(_.waitingApps.size).getOrElse(0)
})

}
11 changes: 8 additions & 3 deletions core/src/main/scala/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private[spark] class Worker(
memory: Int,
masterUrl: String,
workDirPath: String = null)
extends Actor with Logging {
extends Actor with Logging with WorkerInstrumentation {

Utils.checkHost(host, "Expected hostname")
assert (port > 0)
Expand Down Expand Up @@ -97,6 +97,9 @@ private[spark] class Worker(
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
connectToMaster()
startWebUi()

initialize(this)
}

def connectToMaster() {
Expand Down Expand Up @@ -155,10 +158,10 @@ private[spark] class Worker(

case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
masterDisconnected()

case RequestWorkerState => {
sender ! WorkerState(host, port, workerId, executors.values.toList,
finishedExecutors.values.toList, masterUrl, cores, memory,
finishedExecutors.values.toList, masterUrl, cores, memory,
coresUsed, memoryUsed, masterWebUiUrl)
}
}
Expand All @@ -178,6 +181,8 @@ private[spark] class Worker(
override def postStop() {
executors.values.foreach(_.kill())
webUi.stop()

uninitialize()
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package spark.deploy.worker

import com.codahale.metrics.{JmxReporter, Gauge, MetricRegistry}

import spark.metrics.AbstractInstrumentation

private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
var workerInst: Option[Worker] = None
val metricRegistry = new MetricRegistry()

override def registryHandler = metricRegistry

override def instance = "worker"

def initialize(worker: Worker) {
workerInst = Some(worker)

// Register and start all the sinks
registerSinks()
}

def uninitialize() {
unregisterSinks()
}

// Gauge for executors number
metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"),
new Gauge[Int] {
override def getValue: Int = workerInst.map(_.executors.size).getOrElse(0)
})

// Gauge for cores used of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"),
new Gauge[Int] {
override def getValue: Int = workerInst.map(_.coresUsed).getOrElse(0)
})

// Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "Mbytes"),
new Gauge[Int] {
override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0)
})

// Gauge for cores free of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"),
new Gauge[Int] {
override def getValue: Int = workerInst.map(_.coresFree).getOrElse(0)
})

// Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"),
new Gauge[Int] {
override def getValue: Int = workerInst.map(_.memoryFree).getOrElse(0)
})
}

0 comments on commit 9dec8c7

Please sign in to comment.