Skip to content

Commit

Permalink
MetricsSystem refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Jul 24, 2013
1 parent c3daad3 commit 03f9871
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 149 deletions.
17 changes: 10 additions & 7 deletions core/src/main/scala/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import spark.deploy._
import spark.{Logging, SparkException, Utils}
import spark.metrics.MetricsSystem
import spark.util.AkkaUtils
import ui.MasterWebUI


private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor
with Logging with MasterInstrumentation {
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000

Expand All @@ -57,6 +57,8 @@ with Logging with MasterInstrumentation {
val webUi = new MasterWebUI(self, webUiPort)

Utils.checkHost(host, "Expected hostname")

val masterInstrumentation = new MasterInstrumentation(this)

val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
Expand All @@ -75,7 +77,7 @@ with Logging with MasterInstrumentation {
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())

initialize(this)
Master.metricsSystem.registerSource(masterInstrumentation)
}

override def postStop() {
Expand Down Expand Up @@ -319,21 +321,22 @@ with Logging with MasterInstrumentation {
removeWorker(worker)
}
}

override def postStop() {
uninitialize()
}
}

private[spark] object Master {
private val systemName = "sparkMaster"
private val actorName = "Master"
private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r

private val metricsSystem = MetricsSystem.createMetricsSystem("master")

def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort)

metricsSystem.start()
actorSystem.awaitTermination()
metricsSystem.stop()
}

/** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
Expand Down
64 changes: 26 additions & 38 deletions core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
Original file line number Diff line number Diff line change
@@ -1,47 +1,35 @@
package spark.deploy.master

import com.codahale.metrics.{Gauge, JmxReporter, MetricRegistry}
import java.util.{Map, HashMap => JHashMap}

import spark.metrics.AbstractInstrumentation
import com.codahale.metrics.{Gauge, Metric}

private[spark] trait MasterInstrumentation extends AbstractInstrumentation {
var masterInst: Option[Master] = None
val metricRegistry = new MetricRegistry()

override def registryHandler = metricRegistry

override def instance = "master"
import spark.metrics.source.Source

private[spark] class MasterInstrumentation(val master: Master) extends Source {
val className = classOf[Master].getName()
val instrumentationName = "master"
override def sourceName = instrumentationName

def initialize(master: Master) {
masterInst = Some(master)
override def getMetrics(): Map[String, Metric] = {
val gauges = new JHashMap[String, Metric]

// Register all the sources
registerSources()
// Gauge for worker numbers in cluster
gauges.put(className + ".workers.number", new Gauge[Int] {
override def getValue: Int = master.workers.size
})

// Register and start all the sinks
registerSinks()
}

def uninitialize() {
unregisterSinks()
// Gauge for application numbers in cluster
gauges.put(className + ".apps.number", new Gauge[Int] {
override def getValue: Int = master.apps.size
})

// Gauge for waiting application numbers in cluster
gauges.put(className + ".waiting_apps.number", new Gauge[Int] {
override def getValue: Int = master.waitingApps.size
})

gauges
}

// Gauge for worker numbers in cluster
metricRegistry.register(MetricRegistry.name(classOf[Master], "workers", "number"),
new Gauge[Int] {
override def getValue: Int = masterInst.map(_.workers.size).getOrElse(0)
})

// Gauge for application numbers in cluster
metricRegistry.register(MetricRegistry.name(classOf[Master], "apps", "number"),
new Gauge[Int] {
override def getValue: Int = masterInst.map(_.apps.size).getOrElse(0)
})

// Gauge for waiting application numbers in cluster
metricRegistry.register(MetricRegistry.name(classOf[Master], "waiting_apps", "number"),
new Gauge[Int] {
override def getValue: Int = masterInst.map(_.waitingApps.size).getOrElse(0)
})

}
12 changes: 9 additions & 3 deletions core/src/main/scala/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import akka.util.duration._
import spark.{Logging, Utils}
import spark.util.AkkaUtils
import spark.deploy._
import spark.metrics.MetricsSystem
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.text.SimpleDateFormat
import java.util.Date
Expand All @@ -41,7 +42,7 @@ private[spark] class Worker(
memory: Int,
masterUrl: String,
workDirPath: String = null)
extends Actor with Logging with WorkerInstrumentation {
extends Actor with Logging {

Utils.checkHost(host, "Expected hostname")
assert (port > 0)
Expand All @@ -67,6 +68,8 @@ private[spark] class Worker(
var coresUsed = 0
var memoryUsed = 0

val workerInstrumentation = new WorkerInstrumentation(this)

def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed

Expand Down Expand Up @@ -99,7 +102,8 @@ private[spark] class Worker(
connectToMaster()
startWebUi()

initialize(this)
Worker.metricsSystem.registerSource(workerInstrumentation)
Worker.metricsSystem.start()
}

def connectToMaster() {
Expand Down Expand Up @@ -182,11 +186,13 @@ private[spark] class Worker(
executors.values.foreach(_.kill())
webUi.stop()

uninitialize()
Worker.metricsSystem.stop()
}
}

private[spark] object Worker {
private val metricsSystem = MetricsSystem.createMetricsSystem("worker")

def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
Expand Down
131 changes: 81 additions & 50 deletions core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
Original file line number Diff line number Diff line change
@@ -1,58 +1,89 @@
package spark.deploy.worker

import com.codahale.metrics.{JmxReporter, Gauge, MetricRegistry}
import com.codahale.metrics.{Gauge, Metric}

import spark.metrics.AbstractInstrumentation
import java.util.{Map, HashMap => JHashMap}

private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
var workerInst: Option[Worker] = None
val metricRegistry = new MetricRegistry()

override def registryHandler = metricRegistry

override def instance = "worker"
import spark.metrics.source.Source

private[spark] class WorkerInstrumentation(val worker: Worker) extends Source {
val className = classOf[Worker].getName()

def initialize(worker: Worker) {
workerInst = Some(worker)
override def sourceName = "worker"

// Register all the sources
registerSources()
override def getMetrics: Map[String, Metric] = {
val gauges = new JHashMap[String, Metric]

// Register and start all the sinks
registerSinks()
}

def uninitialize() {
unregisterSinks()
// Gauge for executors number
gauges.put(className + ".executor.number", new Gauge[Int]{
override def getValue: Int = worker.executors.size
})

gauges.put(className + ".core_used.number", new Gauge[Int]{
override def getValue: Int = worker.coresUsed
})

gauges.put(className + ".mem_used.MBytes", new Gauge[Int]{
override def getValue: Int = worker.memoryUsed
})

gauges.put(className + ".core_free.number", new Gauge[Int]{
override def getValue: Int = worker.coresFree
})

gauges.put(className + ".mem_free.MBytes", new Gauge[Int]{
override def getValue: Int = worker.memoryFree
})

gauges
}

// Gauge for executors number
metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"),
new Gauge[Int] {
override def getValue: Int = workerInst.map(_.executors.size).getOrElse(0)
})

// Gauge for cores used of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"),
new Gauge[Int] {
override def getValue: Int = workerInst.map(_.coresUsed).getOrElse(0)
})

// Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"),
new Gauge[Int] {
override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0)
})

// Gauge for cores free of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"),
new Gauge[Int] {
override def getValue: Int = workerInst.map(_.coresFree).getOrElse(0)
})

// Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"),
new Gauge[Int] {
override def getValue: Int = workerInst.map(_.memoryFree).getOrElse(0)
})
}
}
//private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
// var workerInst: Option[Worker] = None
// val metricRegistry = new MetricRegistry()
//
// override def registryHandler = metricRegistry
//
// override def instance = "worker"
//
// def initialize(worker: Worker) {
// workerInst = Some(worker)
//
// registerSources()
// registerSinks()
// }
//
// def uninitialize() {
// unregisterSinks()
// }
//
// // Gauge for executors number
// metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"),
// new Gauge[Int] {
// override def getValue: Int = workerInst.map(_.executors.size).getOrElse(0)
// })
//
// // Gauge for cores used of this worker
// metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"),
// new Gauge[Int] {
// override def getValue: Int = workerInst.map(_.coresUsed).getOrElse(0)
// })
//
// // Gauge for memory used of this worker
// metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"),
// new Gauge[Int] {
// override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0)
// })
//
// // Gauge for cores free of this worker
// metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"),
// new Gauge[Int] {
// override def getValue: Int = workerInst.map(_.coresFree).getOrElse(0)
// })
//
// // Gauge for memory used of this worker
// metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"),
// new Gauge[Int] {
// override def getValue: Int = workerInst.map(_.memoryFree).getOrElse(0)
// })
//}
Loading

0 comments on commit 03f9871

Please sign in to comment.