-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-33224][SS][WEBUI] Add watermark gap information into SS UI page #30427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
One thing to think about: should we automatically scale the unit for watermark gap? I just picked seconds which doesn't look too small and too big, but if input event time is delayed by hours it's going to be a bit huge. (It's definitely not a good signal, though.) |
Kubernetes integration test starting |
I've just had a slight look at this and I don't think switching is needed. We're not doing this where bytes showed. If I would be a user and see a graph and all of a sudden I see a different axis meaning I would be confused. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, not yet tested it manually so this still to come.
query: StreamingQueryUIData, | ||
minBatchTime: Long, | ||
maxBatchTime: Long, | ||
jsCollector: JsCollector): NodeBuffer = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now sure what complications it would mean in generateStatTable
but I think we can give back Node
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I simply copied and pasted, and struggled why it requires multiple nodes (hence &+
). My bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to Seq[Node]
as I don't see a way to instantiate empty Node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing to Option[Node]
make us to force wrapping <tr>...</tr>
to Option, whereas leaving the return type to NodeBuffer
or Seq[Node]
don't. (scala.xml.Node
looks to have interesting implementation - Node
extends NodeSeq
)
I think either NodeBuffer
or Seq[Node]
is simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I see and agree doesn't worth the hassle.
<tr> | ||
<td style="vertical-align: middle;"> | ||
<div style="width: 160px;"> | ||
<div><strong>Global Watermark Gap {SparkUIUtils.tooltip("The gap between timestamp and global watermark for the batch.", "right")}</strong></div> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that timestamp
here means now but maybe we can more explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Probably better to say batch timestamp
explicitly.
@@ -51,6 +53,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B | |||
val conf = new SparkConf() | |||
.setMaster(master) | |||
.setAppName("ui-test") | |||
.set(SHUFFLE_PARTITIONS, 5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, is this to speed up the unit test not to start 200 tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Once I changed the active query a bit to have watermark being set, it suffered to make progress in 30 seconds (meaning checking the UI failed as there's no queryProgress) and failed. This fixed the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has similar problem before, just wanted to double check. Thanks!
Kubernetes integration test status success |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. Thank you, @HeartSaVioR .
For your design choice, do we support automatically scale the unit in the other graph? Otherwise, I agree with your decision (sec
).
One thing to think about: should we automatically scale the unit for watermark gap?
cc @viirya |
I've double checked the graphs manually and it works fine. |
No |
Test build #131348 has finished for PR 30427 at commit
|
AFAIK we don't support auto scale for other graphs. That sounds like an improvement, but I'm not a FE engineer and we even don't seem to rely on graph library (which may provide rich functionalities) and implement our own, hence harder to make an improvement. The value can goes up very high if you set the additional delay of watermark to a couple of hours or even more (2 hours = 172,800 seconds). If the difference of watermark gap among batches are tiny compared to the additional delay, the graph will just keep showing "nearly" horizontal line. While scaling unit would make us confused, adjusting min/max of y axis might be helpful. I'm just hesitate to make change as I see all existing graphs have 0 as min value of y axis, though. |
cc @xuanyuanking too FYI |
Thank you for the confirmation, @gaborgsomogyi and @HeartSaVioR ! |
cc. @tdas @zsxwing @jose-torres @sarutak as well |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Hmm, my question is, watermark should be derived from event time instead of processing time (I think it should be wall clock here?). In the examples, looks like the event time is as processing time, IIUC. So once the event time from data is different processing time, is this graph still useful? |
The Technically, the graph is almost meaningless on processing time, because the event timestamp would be nearly same as batch timestamp. Even the query is lagging, once the next batch is launched, the event timestamp of inputs will be matched to the batch timestamp. The graph will be helpful if they're either using "ingest time" (not timestamped by Spark, but timestamped when ingested to the input storage) which could show the lag of process, or using "event time" which is the best case of showing the gap. The figure is borrowed from the gold articles below. If you haven't read below articles, strongly recommend to read them, or read the book "Streaming Systems". https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/ |
Do I miss anything? The two code is the same. |
Sorry, copy & paste error. Just updated. |
The gap is calculated by the difference between batch timestamp (this should be processing time, right? Because the trigger clock is |
You understand it correctly, though that's just a one of use cases. Given they are running "streaming workload", one of the main goals is to capture the recent outputs (e.g. trends). Watermark would still work for such historical use cases as well, but what to plot to provide values even on the situation remains the question. (What would be the "ideal" timestamp to calculate the gap in this case?) EDIT: for that case, adjusting range on y axis would probably help, otherwise we only see the "line" plotted nearly linear like what I commented above in #30427 (comment). |
Test build #131380 has finished for PR 30427 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala
Outdated
Show resolved
Hide resolved
</div> | ||
</td> | ||
<td class="watermark-gap-timeline">{graphUIDataForWatermark.generateTimelineHtml(jsCollector)}</td> | ||
<td class="watermark-gap-timeline">{graphUIDataForWatermark.generateHistogramHtml(jsCollector)}</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
watermark-gap-histogram?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad. Thanks for finding!
Test build #131405 has finished for PR 30427 at commit
|
Kubernetes integration test starting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😂 Sorry for the latest minute comment again(missed the ping...). I'm also OK to address/discuss the comment in a follow-up if it ready to go and this PR does address the general cases of SS. Post my LGTM.
retest this, please |
Test build #131571 has started for PR 30427 at commit |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #131596 has finished for PR 30427 at commit
|
retest this please. |
Test build #131622 has finished for PR 30427 at commit
|
retest this please. |
Test build #131630 has finished for PR 30427 at commit
|
retest this please |
Test build #131649 has finished for PR 30427 at commit
|
Jenkins seems unstable. GA was passed actually. I think it should be okay. |
Yep. The master branch is fixed via 048a982 . |
Retest this please. |
Test build #131692 has finished for PR 30427 at commit
|
d19fd10
to
a6db726
Compare
Just rebased. I'll merge either Github Action or Jenkins is happy with the change. |
Test build #131704 has finished for PR 30427 at commit
|
Thanks all for reviewing! Merged to master. |
…d if built with Scala 2.13 ### What changes were proposed in this pull request? This PR fixes an issue that the histogram and timeline aren't rendered in the `Streaming Query Statistics` page if we built Spark with Scala 2.13.   The reason is [`maxRecordRate` can be `NaN`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala#L371) for Scala 2.13. The `NaN` is the result of [`query.recentProgress.map(_.inputRowsPerSecond).max`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala#L372) when the first element of `query.recentProgress.map(_.inputRowsPerSecond)` is `NaN`. Actually, the comparison logic for `Double` type was changed in Scala 2.13. scala/bug#12107 scala/scala#6410 So this issue happens as of Scala 2.13. The root cause of the `NaN` is [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L164). This `NaN` seems to be an initial value of `inputTimeSec` so I think `Double.PositiveInfinity` is suitable rather than `NaN` and this change can resolve this issue. ### Why are the changes needed? To make sure we can use the histogram/timeline with Scala 2.13. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? First, I built with the following commands. ``` $ /dev/change-scala-version.sh 2.13 $ build/sbt -Phive -Phive-thriftserver -Pscala-2.13 package ``` Then, ran the following query (this is brought from #30427 ). ``` import org.apache.spark.sql.streaming.Trigger val query = spark .readStream .format("rate") .option("rowsPerSecond", 1000) .option("rampUpTime", "10s") .load() .selectExpr("*", "CAST(CAST(timestamp AS BIGINT) - CAST((RAND() * 100000) AS BIGINT) AS TIMESTAMP) AS tsMod") .selectExpr("tsMod", "mod(value, 100) as mod", "value") .withWatermark("tsMod", "10 seconds") .groupBy(window($"tsMod", "1 minute", "10 seconds"), $"mod") .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value")) .writeStream .format("console") .trigger(Trigger.ProcessingTime("5 seconds")) .outputMode("append") .start() ``` Finally, I confirmed that the timeline and histogram are rendered.  ``` Closes #30546 from sarutak/ss-nan. Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com> Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
This PR proposes to add the watermark gap information in SS UI page. Please refer below screenshots to see what we'd like to show in UI.
Please note that this PR doesn't plot the watermark value - knowing the gap between actual wall clock and watermark looks more useful than the absolute value.
Why are the changes needed?
Watermark is the one of major metrics the end users need to track for stateful queries. Watermark defines "when" the output will be emitted for append mode, hence knowing how much gap between wall clock and watermark (input data) is very helpful to make expectation of the output.
Does this PR introduce any user-facing change?
Yes, SS UI query page will contain the watermark gap information.
How was this patch tested?
Basic UT added. Manually tested with two queries:
You'll see consistent watermark gap with (15 seconds + a) = 10 seconds are from delay in watermark definition, 5 seconds are trigger interval.
This randomizes the timestamp, hence producing random watermark gap. This won't be smaller than 15 seconds as I described earlier.