Skip to content

Commit

Permalink
Remove unit + rename a variable
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi committed Nov 20, 2020
1 parent bc83741 commit 9a87255
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,15 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit

private lazy val metricStateOnCurrentVersionSizeBytes: StateStoreCustomSizeMetric =
StateStoreCustomSizeMetric("stateOnCurrentVersionSizeBytes",
"estimated size of state only on current version", "bytes")
"estimated size of state only on current version")

private lazy val metricLoadedMapCacheHit: StateStoreCustomMetric =
StateStoreCustomSumMetric("loadedMapCacheHitCount",
"count of cache hit on states cache in provider", "hit count")
"count of cache hit on states cache in provider")

private lazy val metricLoadedMapCacheMiss: StateStoreCustomMetric =
StateStoreCustomSumMetric("loadedMapCacheMissCount",
"count of cache miss on states cache in provider", "hit count")
"count of cache miss on states cache in provider")

private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,11 @@ object StateStoreMetrics {
trait StateStoreCustomMetric {
def name: String
def desc: String
def unit: String = ""
}

case class StateStoreCustomSumMetric(name: String, desc: String, override val unit: String)
extends StateStoreCustomMetric
case class StateStoreCustomSizeMetric(name: String, desc: String, override val unit: String)
extends StateStoreCustomMetric
case class StateStoreCustomTimingMetric(name: String, desc: String, override val unit: String)
extends StateStoreCustomMetric
case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric
case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric
case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric

/**
* An exception thrown when an invalid UnsafeRow is detected in state store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,11 @@ class SymmetricHashJoinStateManager(
keyWithIndexToValueMetrics.numKeys, // represent each buffered row only once
keyToNumValuesMetrics.memoryUsedBytes + keyWithIndexToValueMetrics.memoryUsedBytes,
keyWithIndexToValueMetrics.customMetrics.map {
case (s @ StateStoreCustomSumMetric(_, desc, _), value) =>
case (s @ StateStoreCustomSumMetric(_, desc), value) =>
s.copy(desc = newDesc(desc)) -> value
case (s @ StateStoreCustomSizeMetric(_, desc, _), value) =>
case (s @ StateStoreCustomSizeMetric(_, desc), value) =>
s.copy(desc = newDesc(desc)) -> value
case (s @ StateStoreCustomTimingMetric(_, desc, _), value) =>
case (s @ StateStoreCustomTimingMetric(_, desc), value) =>
s.copy(desc = newDesc(desc)) -> value
case (s, _) =>
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
val provider = StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass)
provider.supportedCustomMetrics.map {
case StateStoreCustomSumMetric(name, desc, _) =>
case StateStoreCustomSumMetric(name, desc) =>
name -> SQLMetrics.createMetric(sparkContext, desc)
case StateStoreCustomSizeMetric(name, desc, _) =>
case StateStoreCustomSizeMetric(name, desc) =>
name -> SQLMetrics.createSizeMetric(sparkContext, desc)
case StateStoreCustomTimingMetric(name, desc, _) =>
case StateStoreCustomTimingMetric(name, desc) =>
name -> SQLMetrics.createTimingMetric(sparkContext, desc)
}.toMap
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)

// State store provider implementation mustn't do any heavyweight initialiation in constructor
// but in its init method.
private val supportedMetrics = StateStoreProvider.create(
private val supportedCustomMetrics = StateStoreProvider.create(
parent.parent.conf.get(STATE_STORE_PROVIDER_CLASS)).supportedCustomMetrics
logDebug(s"Supported metrics: $supportedMetrics")
logDebug(s"Supported custom metrics: $supportedCustomMetrics")

def generateLoadResources(request: HttpServletRequest): Seq[Node] = {
// scalastyle:off
Expand Down Expand Up @@ -272,7 +272,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
val data = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.customMetrics.get(metricName).toDouble).sum))
val max = data.maxBy(_._2)._2
val metric = supportedMetrics.find(_.name == metricName).get
val metric = supportedCustomMetrics.find(_.name == metricName).get

val graphUIData =
new GraphUIData(
Expand All @@ -283,7 +283,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
maxBatchTime,
0,
max,
metric.unit)
"")
graphUIData.generateDataJs(jsCollector)

result ++=
Expand Down

0 comments on commit 9a87255

Please sign in to comment.