Skip to content

Commit c3daad3

Browse files
committed
Update metric source support for instrumentation
1 parent 9dec8c7 commit c3daad3

File tree

9 files changed

+70
-14
lines changed

9 files changed

+70
-14
lines changed

conf/metrics.properties

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
# syntax: [prefix].[sink].[instance].[options]
1+
# syntax: [prefix].[sink|source].[instance].[options]
22

33
*.sink.console.period=10
44

55
*.sink.console.unit=second
66

7+
master.source.jvm.class=spark.metrics.source.JvmSource
8+
79
master.sink.console.period=10
810

911
master.sink.console.unit=second

core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@ private[spark] trait MasterInstrumentation extends AbstractInstrumentation {
1515
def initialize(master: Master) {
1616
masterInst = Some(master)
1717

18+
// Register all the sources
19+
registerSources()
20+
1821
// Register and start all the sinks
19-
registerSinks
22+
registerSinks()
2023
}
2124

2225
def uninitialize() {
23-
unregisterSinks
26+
unregisterSinks()
2427
}
2528

2629
// Gauge for worker numbers in cluster

core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
1515
def initialize(worker: Worker) {
1616
workerInst = Some(worker)
1717

18+
// Register all the sources
19+
registerSources()
20+
1821
// Register and start all the sinks
1922
registerSinks()
2023
}
@@ -36,7 +39,7 @@ private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
3639
})
3740

3841
// Gauge for memory used of this worker
39-
metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "Mbytes"),
42+
metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"),
4043
new Gauge[Int] {
4144
override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0)
4245
})

core/src/main/scala/spark/metrics/AbstractInstrumentation.scala

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,39 @@ import java.util.concurrent.TimeUnit
99

1010
import spark.Logging
1111
import spark.metrics.sink._
12+
import spark.metrics.source._
1213

13-
trait AbstractInstrumentation extends Logging {
14+
private [spark] trait AbstractInstrumentation extends Logging {
1415
initLogging()
1516

17+
// Get MetricRegistry handler
1618
def registryHandler: MetricRegistry
19+
// Get the instance name
1720
def instance: String
1821

1922
val confFile = System.getProperty("spark.metrics.conf.file", MetricsConfig.DEFAULT_CONFIG_FILE)
2023
val metricsConfig = new MetricsConfig(confFile)
2124

2225
val sinks = new mutable.ArrayBuffer[Sink]
26+
val sources = new mutable.ArrayBuffer[Source]
27+
28+
def registerSources() {
29+
val instConfig = metricsConfig.getInstance(instance)
30+
val sourceConfigs = MetricsConfig.subProperties(instConfig, AbstractInstrumentation.SOURCE_REGEX)
31+
32+
// Register all the sources
33+
sourceConfigs.foreach { kv =>
34+
val classPath = kv._2.getProperty("class")
35+
try {
36+
val source = Class.forName(classPath).getConstructor(classOf[MetricRegistry])
37+
.newInstance(registryHandler)
38+
sources += source.asInstanceOf[Source]
39+
} catch {
40+
case e: Exception => logError("source class " + classPath + " cannot be instantialized", e)
41+
}
42+
}
43+
sources.foreach(_.registerSource)
44+
}
2345

2446
def registerSinks() {
2547
val instConfig = metricsConfig.getInstance(instance)
@@ -33,17 +55,17 @@ trait AbstractInstrumentation extends Logging {
3355
val classPath = if (AbstractInstrumentation.DEFAULT_SINKS.contains(kv._1)) {
3456
AbstractInstrumentation.DEFAULT_SINKS(kv._1)
3557
} else {
58+
// For non-default sink, a property class should be set and create using reflection
3659
kv._2.getProperty("class")
3760
}
3861
try {
3962
val sink = Class.forName(classPath).getConstructor(classOf[Properties], classOf[MetricRegistry])
4063
.newInstance(kv._2, registryHandler)
4164
sinks += sink.asInstanceOf[Sink]
4265
} catch {
43-
case e: Exception => logError("class " + classPath + "cannot be instantialize", e)
66+
case e: Exception => logError("sink class " + classPath + " cannot be instantialized", e)
4467
}
4568
}
46-
4769
sinks.foreach(_.registerSink)
4870
}
4971

@@ -58,6 +80,7 @@ object AbstractInstrumentation {
5880
"csv" -> "spark.metrics.sink.CsvSink")
5981

6082
val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
83+
val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
6184

6285
val timeUnits = Map(
6386
"millisecond" -> TimeUnit.MILLISECONDS,

core/src/main/scala/spark/metrics/MetricsConfig.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import java.io.FileInputStream
66
import scala.collection.mutable
77
import scala.util.matching.Regex
88

9-
class MetricsConfig(val configFile: String) {
9+
private [spark] class MetricsConfig(val configFile: String) {
1010
val properties = new Properties()
1111
var fis: FileInputStream = _
1212

@@ -36,7 +36,7 @@ class MetricsConfig(val configFile: String) {
3636
}
3737

3838
object MetricsConfig {
39-
val DEFAULT_CONFIG_FILE = "/home/jerryshao/project/sotc_cloud-spark/conf/metrics.properties"
39+
val DEFAULT_CONFIG_FILE = "conf/metrics.properties"
4040
val DEFAULT_PREFIX = "*"
4141
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
4242

@@ -45,9 +45,11 @@ object MetricsConfig {
4545

4646
import scala.collection.JavaConversions._
4747
prop.foreach { kv =>
48-
val regex(a, b) = kv._1
49-
subProperties.getOrElseUpdate(a, new Properties).setProperty(b, kv._2)
50-
println(">>>>>subProperties added " + a + " " + b + " " + kv._2)
48+
if (regex.findPrefixOf(kv._1) != None) {
49+
val regex(prefix, suffix) = kv._1
50+
subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
51+
println(">>>>>subProperties added " + prefix + " " + suffix + " " + kv._2)
52+
}
5153
}
5254

5355
subProperties

core/src/main/scala/spark/metrics/sink/CsvSink.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ object CsvSink {
4646
val CSV_KEY_UNIT = "unit"
4747
val CSV_KEY_DIR = "directory"
4848

49-
val CSV_DEFAULT_PERIOD = "1"
50-
val CSV_DEFAULT_UNIT = "minute"
49+
val CSV_DEFAULT_PERIOD = "10"
50+
val CSV_DEFAULT_UNIT = "second"
5151
val CSV_DEFAULT_DIR = "/tmp/"
5252
}
5353

core/src/main/scala/spark/metrics/sink/Sink.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ package spark.metrics.sink
22

33
trait Sink {
44
def registerSink: Unit
5+
56
def unregisterSink: Unit
67
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package spark.metrics.source
2+
3+
import com.codahale.metrics.MetricRegistry
4+
import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
5+
6+
class JvmSource(registry: MetricRegistry) extends Source {
7+
// Initialize memory usage gauge for jvm
8+
val memUsageMetricSet = new MemoryUsageGaugeSet
9+
10+
// Initialize garbage collection usage gauge for jvm
11+
val gcMetricSet = new GarbageCollectorMetricSet
12+
13+
override def registerSource() {
14+
registry.registerAll(memUsageMetricSet)
15+
registry.registerAll(gcMetricSet)
16+
}
17+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package spark.metrics.source
2+
3+
trait Source {
4+
def registerSource: Unit
5+
}

0 commit comments

Comments
 (0)