Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33287][SS][UI]Expose state custom metrics information on SS UI #30336

Closed
wants to merge 15 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,16 @@ object StaticSQLConf {
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(-1)

val ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST =
buildStaticConf("spark.sql.streaming.ui.enabledCustomMetricList")
.internal()
.doc("Configures a list of custom metrics on Structured Streaming UI, which are enabled. " +
"The list contains the name of the custom metrics separated by comma. In aggregation" +
"only sum used. The list of supported custom metrics is state store provider specific " +
"and it can be found out for example from query progress log entry.")
.version("3.1.0")
.stringConf
.toSequence
.createWithDefault(Nil)
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,15 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit

private lazy val metricStateOnCurrentVersionSizeBytes: StateStoreCustomSizeMetric =
StateStoreCustomSizeMetric("stateOnCurrentVersionSizeBytes",
"estimated size of state only on current version")
"estimated size of state only on current version", "bytes")

private lazy val metricLoadedMapCacheHit: StateStoreCustomMetric =
StateStoreCustomSumMetric("loadedMapCacheHitCount",
"count of cache hit on states cache in provider")
"count of cache hit on states cache in provider", "hit count")

private lazy val metricLoadedMapCacheMiss: StateStoreCustomMetric =
StateStoreCustomSumMetric("loadedMapCacheMissCount",
"count of cache miss on states cache in provider")
"count of cache miss on states cache in provider", "hit count")

private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,15 @@ object StateStoreMetrics {
trait StateStoreCustomMetric {
def name: String
def desc: String
def unit: String
}

case class StateStoreCustomSumMetric(name: String, desc: String) extends StateStoreCustomMetric
case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric
case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric
case class StateStoreCustomSumMetric(name: String, desc: String, unit: String)
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
extends StateStoreCustomMetric
case class StateStoreCustomSizeMetric(name: String, desc: String, unit: String)
extends StateStoreCustomMetric
case class StateStoreCustomTimingMetric(name: String, desc: String, unit: String)
extends StateStoreCustomMetric

/**
* An exception thrown when an invalid UnsafeRow is detected in state store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,11 @@ class SymmetricHashJoinStateManager(
keyWithIndexToValueMetrics.numKeys, // represent each buffered row only once
keyToNumValuesMetrics.memoryUsedBytes + keyWithIndexToValueMetrics.memoryUsedBytes,
keyWithIndexToValueMetrics.customMetrics.map {
case (s @ StateStoreCustomSumMetric(_, desc), value) =>
case (s @ StateStoreCustomSumMetric(_, desc, _), value) =>
s.copy(desc = newDesc(desc)) -> value
case (s @ StateStoreCustomSizeMetric(_, desc), value) =>
case (s @ StateStoreCustomSizeMetric(_, desc, _), value) =>
s.copy(desc = newDesc(desc)) -> value
case (s @ StateStoreCustomTimingMetric(_, desc), value) =>
case (s @ StateStoreCustomTimingMetric(_, desc, _), value) =>
s.copy(desc = newDesc(desc)) -> value
case (s, _) =>
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,11 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
val provider = StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass)
provider.supportedCustomMetrics.map {
case StateStoreCustomSumMetric(name, desc) =>
case StateStoreCustomSumMetric(name, desc, _) =>
name -> SQLMetrics.createMetric(sparkContext, desc)
case StateStoreCustomSizeMetric(name, desc) =>
case StateStoreCustomSizeMetric(name, desc, _) =>
name -> SQLMetrics.createSizeMetric(sparkContext, desc)
case StateStoreCustomTimingMetric(name, desc) =>
case StateStoreCustomTimingMetric(name, desc, _) =>
name -> SQLMetrics.createTimingMetric(sparkContext, desc)
}.toMap
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,25 @@ import java.lang.{Long => JLong}
import java.util.UUID
import javax.servlet.http.HttpServletRequest

import scala.collection.JavaConverters._
import scala.xml.{Node, NodeBuffer, Unparsed}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.state.StateStoreProvider
import org.apache.spark.sql.internal.SQLConf.STATE_STORE_PROVIDER_CLASS
import org.apache.spark.sql.internal.StaticSQLConf.ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST
import org.apache.spark.sql.streaming.ui.UIUtils._
import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}

private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
extends WebUIPage("statistics") with Logging {

// State store provider implementation mustn't do any heavyweight initialiation in constructor
// but in its init method.
private val supportedMetrics = StateStoreProvider.create(
parent.parent.conf.get(STATE_STORE_PROVIDER_CLASS)).supportedCustomMetrics
logDebug(s"Supported metrics: $supportedMetrics")

def generateLoadResources(request: HttpServletRequest): Seq[Node] = {
// scalastyle:off
<script src={SparkUIUtils.prependBaseUri(request, "/static/d3.min.js")}></script>
Expand Down Expand Up @@ -199,16 +209,17 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
"records")
graphUIDataForNumRowsDroppedByWatermark.generateDataJs(jsCollector)

// scalastyle:off
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Total State Rows {SparkUIUtils.tooltip("Aggregated number of total state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-total-state-rows-timeline"}>{graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-total-state-rows-histogram"}>{graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)}</td>
</tr>
val result =
// scalastyle:off
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Total State Rows {SparkUIUtils.tooltip("Aggregated number of total state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-total-state-rows-timeline"}>{graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-total-state-rows-histogram"}>{graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
Expand Down Expand Up @@ -236,12 +247,62 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
<td class={"aggregated-num-state-rows-dropped-by-watermark-timeline"}>{graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-state-rows-dropped-by-watermark-histogram"}>{graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)}</td>
</tr>
// scalastyle:on
// scalastyle:on

result ++= generateAggregatedCustomMetrics(query, minBatchTime, maxBatchTime, jsCollector)
result
} else {
new NodeBuffer()
}
}

def generateAggregatedCustomMetrics(
query: StreamingQueryUIData,
minBatchTime: Long,
maxBatchTime: Long,
jsCollector: JsCollector): NodeBuffer = {
val result: NodeBuffer = new NodeBuffer

// This is made sure on caller side but put it here to be defensive
require(query.lastProgress.stateOperators.nonEmpty)
val enabledCustomMetrics = parent.parent.conf.get(ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST)
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
logDebug(s"Enabled custom metrics: $enabledCustomMetrics")
query.lastProgress.stateOperators.head.customMetrics.keySet().asScala
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
.filter(enabledCustomMetrics.contains(_)).map { metricName =>
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
val data = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.customMetrics.get(metricName).toDouble).sum))
val max = data.maxBy(_._2)._2
val metric = supportedMetrics.find(_.name == metricName).get

val graphUIData =
new GraphUIData(
s"aggregated-$metricName-timeline",
s"aggregated-$metricName-histogram",
data,
minBatchTime,
maxBatchTime,
0,
max,
metric.unit)
graphUIData.generateDataJs(jsCollector)

result ++=
// scalastyle:off
<tr>
<td style="vertical-align: middle;">
<div style="width: 240px;">
<div><strong>Aggregated Custom Metric {s"$metricName"} {SparkUIUtils.tooltip(metric.desc, "right")}</strong></div>
</div>
</td>
<td class={s"aggregated-$metricName-timeline"}>{graphUIData.generateTimelineHtml(jsCollector)}</td>
<td class={s"aggregated-$metricName-histogram"}>{graphUIData.generateHistogramHtml(jsCollector)}</td>
</tr>
// scalastyle:on
}

result
}

def generateStatTable(query: StreamingQueryUIData): Seq[Node] = {
val batchToTimestamps = withNoProgress(query,
query.recentProgress.map(p => (p.batchId, parseProgressTimestamp(p.timestamp))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.internal.config.UI.{UI_ENABLED, UI_PORT}
import org.apache.spark.sql.LocalSparkSession.withSparkSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.internal.StaticSQLConf.ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST
import org.apache.spark.sql.streaming.StreamingQueryException
import org.apache.spark.ui.SparkUICssErrorHandler

Expand All @@ -53,6 +54,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
.setAppName("ui-test")
.set(UI_ENABLED, true)
.set(UI_PORT, 0)
.set(ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST, Seq("stateOnCurrentVersionSizeBytes"))
additionalConfs.foreach { case (k, v) => conf.set(k, v) }
val spark = SparkSession.builder().master(master).config(conf).getOrCreate()
assert(spark.sparkContext.ui.isDefined)
Expand Down Expand Up @@ -140,6 +142,10 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
summaryText should contain ("Aggregated Number Of Updated State Rows (?)")
summaryText should contain ("Aggregated State Memory Used In Bytes (?)")
summaryText should contain ("Aggregated Number Of State Rows Dropped By Watermark (?)")
summaryText should contain ("Aggregated Custom Metric stateOnCurrentVersionSizeBytes" +
" (?)")
summaryText should not contain ("Aggregated Custom Metric loadedMapCacheHitCount (?)")
summaryText should not contain ("Aggregated Custom Metric loadedMapCacheMissCount (?)")
}
} finally {
spark.streams.active.foreach(_.stop())
Expand Down