diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 61f00041296b8..5c55034e88df5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -303,15 +303,15 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit private lazy val metricStateOnCurrentVersionSizeBytes: StateStoreCustomSizeMetric = StateStoreCustomSizeMetric("stateOnCurrentVersionSizeBytes", - "estimated size of state only on current version", "bytes") + "estimated size of state only on current version") private lazy val metricLoadedMapCacheHit: StateStoreCustomMetric = StateStoreCustomSumMetric("loadedMapCacheHitCount", - "count of cache hit on states cache in provider", "hit count") + "count of cache hit on states cache in provider") private lazy val metricLoadedMapCacheMiss: StateStoreCustomMetric = StateStoreCustomSumMetric("loadedMapCacheMissCount", - "count of cache miss on states cache in provider", "hit count") + "count of cache miss on states cache in provider") private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 084ddf8077a15..05bcee7b05c6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -178,15 +178,11 @@ object StateStoreMetrics { trait StateStoreCustomMetric { def name: String def desc: String - def unit: String = "" } -case class StateStoreCustomSumMetric(name: String, desc: String, override val unit: String) - extends StateStoreCustomMetric -case class StateStoreCustomSizeMetric(name: String, desc: String, override val unit: String) - extends StateStoreCustomMetric -case class StateStoreCustomTimingMetric(name: String, desc: String, override val unit: String) - extends StateStoreCustomMetric +case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric +case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric +case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric /** * An exception thrown when an invalid UnsafeRow is detected in state store. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala index 8cf3739e11150..dae771c613131 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala @@ -313,11 +313,11 @@ class SymmetricHashJoinStateManager( keyWithIndexToValueMetrics.numKeys, // represent each buffered row only once keyToNumValuesMetrics.memoryUsedBytes + keyWithIndexToValueMetrics.memoryUsedBytes, keyWithIndexToValueMetrics.customMetrics.map { - case (s @ StateStoreCustomSumMetric(_, desc, _), value) => + case (s @ StateStoreCustomSumMetric(_, desc), value) => s.copy(desc = newDesc(desc)) -> value - case (s @ StateStoreCustomSizeMetric(_, desc, _), value) => + case (s @ StateStoreCustomSizeMetric(_, desc), value) => s.copy(desc = newDesc(desc)) -> value - case (s @ StateStoreCustomTimingMetric(_, desc, _), value) => + case (s @ StateStoreCustomTimingMetric(_, desc), value) => s.copy(desc = newDesc(desc)) -> value case (s, _) => throw new IllegalArgumentException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 9a5183a22d23d..1449d937982e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -126,11 +126,11 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => private def stateStoreCustomMetrics: Map[String, SQLMetric] = { val provider = StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass) provider.supportedCustomMetrics.map { - case StateStoreCustomSumMetric(name, desc, _) => + case StateStoreCustomSumMetric(name, desc) => name -> SQLMetrics.createMetric(sparkContext, desc) - case StateStoreCustomSizeMetric(name, desc, _) => + case StateStoreCustomSizeMetric(name, desc) => name -> SQLMetrics.createSizeMetric(sparkContext, desc) - case StateStoreCustomTimingMetric(name, desc, _) => + case StateStoreCustomTimingMetric(name, desc) => name -> SQLMetrics.createTimingMetric(sparkContext, desc) }.toMap } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala index 966de0cdae83e..80c44159e8248 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala @@ -37,9 +37,9 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab) // State store provider implementation mustn't do any heavyweight initialiation in constructor // but in its init method. - private val supportedMetrics = StateStoreProvider.create( + private val supportedCustomMetrics = StateStoreProvider.create( parent.parent.conf.get(STATE_STORE_PROVIDER_CLASS)).supportedCustomMetrics - logDebug(s"Supported metrics: $supportedMetrics") + logDebug(s"Supported custom metrics: $supportedCustomMetrics") def generateLoadResources(request: HttpServletRequest): Seq[Node] = { // scalastyle:off @@ -272,7 +272,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab) val data = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp), p.stateOperators.map(_.customMetrics.get(metricName).toDouble).sum)) val max = data.maxBy(_._2)._2 - val metric = supportedMetrics.find(_.name == metricName).get + val metric = supportedCustomMetrics.find(_.name == metricName).get val graphUIData = new GraphUIData( @@ -283,7 +283,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab) maxBatchTime, 0, max, - metric.unit) + "") graphUIData.generateDataJs(jsCollector) result ++=