Skip to content

[SPARK-33224][SS][WEBUI] Add watermark gap information into SS UI page #30427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,58 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
<br />
}

def generateWatermark(
query: StreamingQueryUIData,
minBatchTime: Long,
maxBatchTime: Long,
jsCollector: JsCollector): Seq[Node] = {
// This is made sure on caller side but put it here to be defensive
require(query.lastProgress != null)
if (query.lastProgress.eventTime.containsKey("watermark")) {
val watermarkData = query.recentProgress.flatMap { p =>
val batchTimestamp = parseProgressTimestamp(p.timestamp)
val watermarkValue = parseProgressTimestamp(p.eventTime.get("watermark"))
if (watermarkValue > 0L) {
// seconds
Some((batchTimestamp, ((batchTimestamp - watermarkValue) / 1000.0)))
Copy link
Member

@xuanyuanking xuanyuanking Nov 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fully agree with knowing the gap between actual wall clock and watermark looks more useful than the absolute value. and thanks for the super useful watermark info!

I only have one concern same with @viirya #30427 (comment). Maybe we can address both scenarios(event time is/isn't close to clock time) by using the max event time received in this batch? I mean:

         if (watermarkValue > 0L) {
           // seconds
-          Some((batchTimestamp, ((batchTimestamp - watermarkValue) / 1000.0)))
+          val maxEventTime = parseProgressTimestamp(p.eventTime.get("max"))
+          Some((batchTimestamp, (maxEventTime - watermarkValue) / 1000.0))
         } else {
           None
         }

Of cause this proposal changes the meaning of this chart, it represents The gap between the latest event and global watermark for the batch. And we might need to add more explanation here since the number will be negative when all the data in the current batch is late than the current watermark(it can be reproduced by the complex demo provided). WDYT? @HeartSaVioR @viirya

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Nov 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, but when you pick the max event time, you're discarding the gap between wall clock and event time. The intention of showing watermark gap is showing the "gap" between "the event time" of events (which finally produces watermark) and "wall clock".

Would this answer your comment?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Nov 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we can represent various aspect of views if we plot all points (wall clock, max event time, global watermark, etc. like min event time). As I'm not a FE and don't like to jump in and hack the code around graph, I just stick with one line and decide to plot the line for the gap between wall clock and global watermark.

If someone is interested to do some experiment with the graph, plotting all lines and finding relations and deciding lines to keep would be valuable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that depends on how we define watermark gap here.
These 2 definitions will not show a difference in result in the ideal case. My point is as the watermark is decided by event time, seems it should make more sense to use both event time to get the gap.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can represent various aspect of views if we plot all points
Yeah agree, if a multi-line chart is available here might be helpful here! :)

Actually, this idea came when I was thinking about the ideal line for the historical streaming data that should be using event time to represent processing time, not current clock time.

Anyway, just want to post a different explanation of the watermark gap here, the current changes LGTM. If others think it's worth having another event-based gap maybe we can do it at another timeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More correctly, how we define the "processing time" in the graph in #30427 (comment). (y axis)

The query which pulls recent events are expected to have processing time as wall clock. That is only broken when we deal with historical data - that's not having the "ideal" processing time. One of approaches which can rough guess would be tracking event time, but given Spark takes max event time to calculate watermark (while other engines take min event time) the gap is more likely pretty much similar across batches.

In historical case, as well as real time case (as Spark picks max event time), tracking the gap between global watermark and min event time would be more helpful, as we can at least see whether the watermark delay is enough to cover the min event time of the next batch. This is pretty specific to Spark's case, though.

(So likewise I said, there're several useful lines to plot which can be compared between and produce the meaning. I just don't take the step to go my life for frontend engineer.)

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Nov 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I took too many efforts to write the comment and wrote too late. In short, if we can pick the second metric to compare with global watermark, it should be min instead of max. If Spark also picks min event time to construct watermark, we should pick max to see how much the output is lagging due to slow watermark advance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xuanyuanking for raising this discussion.

OK I took too many efforts to write the comment and wrote too late. In short, if we can pick the second metric to compare with global watermark, it should be min instead of max. If Spark also picks min event time to construct watermark, we should pick max to see how much the output is lagging due to slow watermark advance.

I agree. Actually my first thought is to use min event time instead of batch time in this graph.

I think a ideal approach should be able to select different base time for constructing this graph, e.g. min event time or batch time. I am not sure if current UI component supports this kind of feature. But for current change, I think it should be good enough for use cases except for event time is far from clock time. That is why I gave +1 for this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it should be great if someone who is familiar with the FE volunteers to revise the graphs. I actually think other existing graphs are also not that ideal. (in point of auto-scale, multiple lines plotting, better unit & value & tooltip)

} else {
None
}
}

if (watermarkData.nonEmpty) {
val maxWatermark = watermarkData.maxBy(_._2)._2
val graphUIDataForWatermark =
new GraphUIData(
"watermark-gap-timeline",
"watermark-gap-histogram",
watermarkData,
minBatchTime,
maxBatchTime,
0,
maxWatermark,
"seconds")
graphUIDataForWatermark.generateDataJs(jsCollector)

// scalastyle:off
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Global Watermark Gap {SparkUIUtils.tooltip("The gap between batch timestamp and global watermark for the batch.", "right")}</strong></div>
</div>
</td>
<td class="watermark-gap-timeline">{graphUIDataForWatermark.generateTimelineHtml(jsCollector)}</td>
<td class="watermark-gap-histogram">{graphUIDataForWatermark.generateHistogramHtml(jsCollector)}</td>
</tr>
// scalastyle:on
} else {
Seq.empty[Node]
}
} else {
Seq.empty[Node]
}
}

def generateAggregatedStateOperators(
query: StreamingQueryUIData,
minBatchTime: Long,
Expand Down Expand Up @@ -465,6 +517,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
</td>
<td class="duration-area-stack" colspan="2">{graphUIDataForDuration.generateAreaStackHtmlWithData(jsCollector, operationDurationData)}</td>
</tr>
{generateWatermark(query, minBatchTime, maxBatchTime, jsCollector)}
{generateAggregatedStateOperators(query, minBatchTime, maxBatchTime, jsCollector)}
</tbody>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import org.apache.spark.internal.config.UI.{UI_ENABLED, UI_PORT}
import org.apache.spark.sql.LocalSparkSession.withSparkSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.functions.{window => windowFn, _}
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
import org.apache.spark.sql.internal.StaticSQLConf.ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST
import org.apache.spark.sql.streaming.StreamingQueryException
import org.apache.spark.sql.streaming.{StreamingQueryException, Trigger}
import org.apache.spark.ui.SparkUICssErrorHandler

class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with BeforeAndAfterAll {
Expand All @@ -52,6 +54,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
val conf = new SparkConf()
.setMaster(master)
.setAppName("ui-test")
.set(SHUFFLE_PARTITIONS, 5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, is this to speed up the unit test not to start 200 tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Once I changed the active query a bit to have watermark being set, it suffered to make progress in 30 seconds (meaning checking the UI failed as there's no queryProgress) and failed. This fixed the issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has similar problem before, just wanted to double check. Thanks!

.set(UI_ENABLED, true)
.set(UI_PORT, 0)
.set(ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST, Seq("stateOnCurrentVersionSizeBytes"))
Expand Down Expand Up @@ -79,10 +82,15 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B

val input1 = spark.readStream.format("rate").load()
val input2 = spark.readStream.format("rate").load()
val input3 = spark.readStream.format("rate").load()
val activeQuery =
input1.join(input2, "value").writeStream.format("noop").start()
input1.selectExpr("timestamp", "mod(value, 100) as mod", "value")
.withWatermark("timestamp", "0 second")
.groupBy(windowFn($"timestamp", "10 seconds", "2 seconds"), $"mod")
.agg(avg("value").as("avg_value"))
.writeStream.format("noop").trigger(Trigger.ProcessingTime("5 seconds")).start()
val completedQuery =
input1.join(input2, "value").writeStream.format("noop").start()
input2.join(input3, "value").writeStream.format("noop").start()
completedQuery.stop()
val failedQuery = spark.readStream.format("rate").load().select("value").as[Long]
.map(_ / 0).writeStream.format("noop").start()
Expand Down Expand Up @@ -138,6 +146,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
summaryText should contain ("Input Rows (?)")
summaryText should contain ("Batch Duration (?)")
summaryText should contain ("Operation Duration (?)")
summaryText should contain ("Global Watermark Gap (?)")
summaryText should contain ("Aggregated Number Of Total State Rows (?)")
summaryText should contain ("Aggregated Number Of Updated State Rows (?)")
summaryText should contain ("Aggregated State Memory Used In Bytes (?)")
Expand Down