Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33223][SS][UI]Structured Streaming Web UI state information #30151

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.lang.{Long => JLong}
import java.util.UUID
import javax.servlet.http.HttpServletRequest

import scala.xml.{Node, Unparsed}
import scala.xml.{Node, NodeBuffer, Unparsed}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.streaming.ui.UIUtils._
Expand Down Expand Up @@ -126,6 +126,122 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
<br />
}

def generateAggregatedStateOperators(
query: StreamingQueryUIData,
minBatchTime: Long,
maxBatchTime: Long,
jsCollector: JsCollector): NodeBuffer = {
// This is made sure on caller side but put it here to be defensive
require(query.lastProgress != null)
if (query.lastProgress.stateOperators.nonEmpty) {
val numRowsTotalData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.numRowsTotal).sum.toDouble))
val maxNumRowsTotal = numRowsTotalData.maxBy(_._2)._2

val numRowsUpdatedData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.numRowsUpdated).sum.toDouble))
val maxNumRowsUpdated = numRowsUpdatedData.maxBy(_._2)._2

val memoryUsedBytesData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.memoryUsedBytes).sum.toDouble))
val maxMemoryUsedBytes = memoryUsedBytesData.maxBy(_._2)._2

val numRowsDroppedByWatermarkData = query.recentProgress
.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.numRowsDroppedByWatermark).sum.toDouble))
val maxNumRowsDroppedByWatermark = numRowsDroppedByWatermarkData.maxBy(_._2)._2

val graphUIDataForNumberTotalRows =
new GraphUIData(
"aggregated-num-total-state-rows-timeline",
"aggregated-num-total-state-rows-histogram",
numRowsTotalData,
minBatchTime,
maxBatchTime,
0,
maxNumRowsTotal,
"records")
graphUIDataForNumberTotalRows.generateDataJs(jsCollector)

val graphUIDataForNumberUpdatedRows =
new GraphUIData(
"aggregated-num-updated-state-rows-timeline",
"aggregated-num-updated-state-rows-histogram",
numRowsUpdatedData,
minBatchTime,
maxBatchTime,
0,
maxNumRowsUpdated,
"records")
graphUIDataForNumberUpdatedRows.generateDataJs(jsCollector)

val graphUIDataForMemoryUsedBytes =
new GraphUIData(
"aggregated-state-memory-used-bytes-timeline",
"aggregated-state-memory-used-bytes-histogram",
memoryUsedBytesData,
minBatchTime,
maxBatchTime,
0,
maxMemoryUsedBytes,
"bytes")
graphUIDataForMemoryUsedBytes.generateDataJs(jsCollector)

val graphUIDataForNumRowsDroppedByWatermark =
new GraphUIData(
"aggregated-num-state-rows-dropped-by-watermark-timeline",
"aggregated-num-state-rows-dropped-by-watermark-histogram",
numRowsDroppedByWatermarkData,
minBatchTime,
maxBatchTime,
0,
maxNumRowsDroppedByWatermark,
"records")
graphUIDataForNumRowsDroppedByWatermark.generateDataJs(jsCollector)

// scalastyle:off
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Total State Rows {SparkUIUtils.tooltip("Aggregated number of total state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-total-state-rows-timeline"}>{graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-total-state-rows-histogram"}>{graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Updated State Rows {SparkUIUtils.tooltip("Aggregated number of updated state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-updated-state-rows-timeline"}>{graphUIDataForNumberUpdatedRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-updated-state-rows-histogram"}>{graphUIDataForNumberUpdatedRows.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated State Memory Used In Bytes {SparkUIUtils.tooltip("Aggregated state memory used in bytes.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-state-memory-used-bytes-timeline"}>{graphUIDataForMemoryUsedBytes.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-state-memory-used-bytes-histogram"}>{graphUIDataForMemoryUsedBytes.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of State Rows Dropped By Watermark {SparkUIUtils.tooltip("Aggregated number of state rows dropped by watermark.", "right")}</strong></div>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured out during work on mine - the rows dropped by watermark are not from state. It's a bit confusing, but they're input rows for "stateful operators". I'll make a follow-up PR to correct this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

</div>
</td>
<td class={"aggregated-num-state-rows-dropped-by-watermark-timeline"}>{graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-state-rows-dropped-by-watermark-histogram"}>{graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)}</td>
</tr>
// scalastyle:on
} else {
new NodeBuffer()
}
}

def generateStatTable(query: StreamingQueryUIData): Seq[Node] = {
val batchToTimestamps = withNoProgress(query,
query.recentProgress.map(p => (p.batchId, parseProgressTimestamp(p.timestamp))),
Expand Down Expand Up @@ -284,6 +400,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
</td>
<td class="duration-area-stack" colspan="2">{graphUIDataForDuration.generateAreaStackHtmlWithData(jsCollector, operationDurationData)}</td>
</tr>
{generateAggregatedStateOperators(query, minBatchTime, maxBatchTime, jsCollector)}
</tbody>
</table>
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
h3Text should not contain ("Streaming Query")

val input1 = spark.readStream.format("rate").load()
val input2 = spark.readStream.format("rate").load()
val activeQuery =
spark.readStream.format("rate").load().writeStream.format("noop").start()
input1.join(input2, "value").writeStream.format("noop").start()
val completedQuery =
spark.readStream.format("rate").load().writeStream.format("noop").start()
input1.join(input2, "value").writeStream.format("noop").start()
completedQuery.stop()
val failedQuery = spark.readStream.format("rate").load().select("value").as[Long]
.map(_ / 0).writeStream.format("noop").start()
Expand Down Expand Up @@ -129,6 +131,15 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
findAll(cssSelector("""#stat-table th""")).map(_.text).toSeq should be {
List("", "Timelines", "Histograms")
}
summaryText should contain ("Input Rate (?)")
summaryText should contain ("Process Rate (?)")
summaryText should contain ("Input Rows (?)")
summaryText should contain ("Batch Duration (?)")
summaryText should contain ("Operation Duration (?)")
summaryText should contain ("Aggregated Number Of Total State Rows (?)")
summaryText should contain ("Aggregated Number Of Updated State Rows (?)")
summaryText should contain ("Aggregated State Memory Used In Bytes (?)")
summaryText should contain ("Aggregated Number Of State Rows Dropped By Watermark (?)")
}
} finally {
spark.streams.active.foreach(_.stop())
Expand Down