From 5ce5dc9fcd7acf5c58dd3d456a629b01d57514e4 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 28 Jun 2013 14:48:21 +0800 Subject: [PATCH] Add default properties to deal with no configure file situation --- .../scala/spark/metrics/MetricsConfig.scala | 28 ++++++++++++------- .../scala/spark/metrics/MetricsSystem.scala | 9 ++---- .../spark/metrics/sink/ConsoleSink.scala | 6 ++-- .../scala/spark/metrics/sink/CsvSink.scala | 6 ++-- .../scala/spark/metrics/sink/JmxSink.scala | 6 ++-- 5 files changed, 30 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala index be4f670918ae4..7405192058e48 100644 --- a/core/src/main/scala/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala @@ -1,20 +1,25 @@ package spark.metrics import java.util.Properties -import java.io.FileInputStream +import java.io.{File, FileInputStream} import scala.collection.mutable import scala.util.matching.Regex private [spark] class MetricsConfig(val configFile: String) { val properties = new Properties() - var fis: FileInputStream = _ + // Add default properties in case there's no properties file + MetricsConfig.setDefaultProperties(properties) - try { - fis = new FileInputStream(configFile) - properties.load(fis) - } finally { - fis.close() + val confFile = new File(configFile) + if (confFile.exists()) { + var fis: FileInputStream = null + try { + fis = new FileInputStream(configFile) + properties.load(fis) + } finally { + fis.close() + } } val propertyCategories = MetricsConfig.subProperties(properties, MetricsConfig.INSTANCE_REGEX) @@ -35,11 +40,15 @@ private [spark] class MetricsConfig(val configFile: String) { } } -object MetricsConfig { - val DEFAULT_CONFIG_FILE = "conf/metrics.properties" +private[spark] object MetricsConfig { val DEFAULT_PREFIX = "*" val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r + def setDefaultProperties(prop: Properties) { + prop.setProperty("*.sink.jmx.enabled", "default") + prop.setProperty("*.source.jvm.class", "spark.metrics.source.JvmSource") + } + def subProperties(prop: Properties, regex: Regex) = { val subProperties = new mutable.HashMap[String, Properties] @@ -48,7 +57,6 @@ object MetricsConfig { if (regex.findPrefixOf(kv._1) != None) { val regex(prefix, suffix) = kv._1 subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2) - println(">>>>>subProperties added " + prefix + " " + suffix + " " + kv._2) } } diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala index 5bfdc00eaff32..6e448cb2a5765 100644 --- a/core/src/main/scala/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala @@ -14,7 +14,7 @@ import spark.metrics.source._ private[spark] class MetricsSystem private (val instance: String) extends Logging { initLogging() - val confFile = System.getProperty("spark.metrics.conf.file", MetricsConfig.DEFAULT_CONFIG_FILE) + val confFile = System.getProperty("spark.metrics.conf.file", "unsupported") val metricsConfig = new MetricsConfig(confFile) val sinks = new mutable.ArrayBuffer[Sink] @@ -58,9 +58,6 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin val instConfig = metricsConfig.getInstance(instance) val sinkConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX) - // Register JMX sink as a default sink - sinks += new JmxSink(registry) - // Register other sinks according to conf sinkConfigs.foreach { kv => val classPath = if (MetricsSystem.DEFAULT_SINKS.contains(kv._1)) { @@ -81,9 +78,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin } private[spark] object MetricsSystem { - val DEFAULT_SINKS = Map( - "console" -> "spark.metrics.sink.ConsoleSink", - "csv" -> "spark.metrics.sink.CsvSink") + val DEFAULT_SINKS = Map("jmx" -> "spark.metrics.sink.JmxSink") val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala index e2e4197d1dcad..d7b7a9e501589 100644 --- a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala @@ -1,10 +1,10 @@ package spark.metrics.sink +import com.codahale.metrics.{ConsoleReporter, MetricRegistry} + import java.util.Properties import java.util.concurrent.TimeUnit -import com.codahale.metrics.{ConsoleReporter, MetricRegistry} - import spark.metrics.MetricsSystem class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink { @@ -18,7 +18,7 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend case None => MetricsSystem.timeUnits(ConsoleSink.CONSOLE_DEFAULT_UNIT) } - var reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry) + val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build() diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala index c2d645331c211..e6c5bffd3c410 100644 --- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala @@ -1,11 +1,11 @@ package spark.metrics.sink +import com.codahale.metrics.{CsvReporter, MetricRegistry} + import java.io.File import java.util.{Locale, Properties} import java.util.concurrent.TimeUnit -import com.codahale.metrics.{CsvReporter, MetricRegistry} - import spark.metrics.MetricsSystem class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink { @@ -24,7 +24,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si case None => CsvSink.CSV_DEFAULT_DIR } - var reporter: CsvReporter = CsvReporter.forRegistry(registry) + val reporter: CsvReporter = CsvReporter.forRegistry(registry) .formatFor(Locale.US) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) diff --git a/core/src/main/scala/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/spark/metrics/sink/JmxSink.scala index 98b55f7b7f5d2..f097a631c0c0d 100644 --- a/core/src/main/scala/spark/metrics/sink/JmxSink.scala +++ b/core/src/main/scala/spark/metrics/sink/JmxSink.scala @@ -1,9 +1,11 @@ package spark.metrics.sink +import java.util.Properties + import com.codahale.metrics.{JmxReporter, MetricRegistry} -class JmxSink(registry: MetricRegistry) extends Sink { - var reporter: JmxReporter = JmxReporter.forRegistry(registry).build() +class JmxSink(val property: Properties, val registry: MetricRegistry) extends Sink { + val reporter: JmxReporter = JmxReporter.forRegistry(registry).build() override def start() { reporter.start()