Skip to content

[SPARK-33224][SS][WEBUI] Add watermark gap information into SS UI page #30427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Nov 19, 2020

What changes were proposed in this pull request?

This PR proposes to add the watermark gap information in SS UI page. Please refer below screenshots to see what we'd like to show in UI.

Screen Shot 2020-11-19 at 6 56 38 PM

Please note that this PR doesn't plot the watermark value - knowing the gap between actual wall clock and watermark looks more useful than the absolute value.

Why are the changes needed?

Watermark is the one of major metrics the end users need to track for stateful queries. Watermark defines "when" the output will be emitted for append mode, hence knowing how much gap between wall clock and watermark (input data) is very helpful to make expectation of the output.

Does this PR introduce any user-facing change?

Yes, SS UI query page will contain the watermark gap information.

How was this patch tested?

Basic UT added. Manually tested with two queries:

simple case

You'll see consistent watermark gap with (15 seconds + a) = 10 seconds are from delay in watermark definition, 5 seconds are trigger interval.

import org.apache.spark.sql.streaming.Trigger

spark.conf.set("spark.sql.shuffle.partitions", "10")

val query = spark
  .readStream
  .format("rate")
  .option("rowsPerSecond", 1000)
  .option("rampUpTime", "10s")
  .load()
  .selectExpr("timestamp", "mod(value, 100) as mod", "value")
  .withWatermark("timestamp", "10 seconds")
  .groupBy(window($"timestamp", "1 minute", "10 seconds"), $"mod")
  .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value"))
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .outputMode("append")
  .start()

query.awaitTermination()

Screen Shot 2020-11-19 at 7 00 21 PM

complicated case

This randomizes the timestamp, hence producing random watermark gap. This won't be smaller than 15 seconds as I described earlier.

import org.apache.spark.sql.streaming.Trigger

spark.conf.set("spark.sql.shuffle.partitions", "10")

val query = spark
  .readStream
  .format("rate")
  .option("rowsPerSecond", 1000)
  .option("rampUpTime", "10s")
  .load()
  .selectExpr("*", "CAST(CAST(timestamp AS BIGINT) - CAST((RAND() * 100000) AS BIGINT) AS TIMESTAMP) AS tsMod")
  .selectExpr("tsMod", "mod(value, 100) as mod", "value")
  .withWatermark("tsMod", "10 seconds")
  .groupBy(window($"tsMod", "1 minute", "10 seconds"), $"mod")
  .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value"))
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .outputMode("append")
  .start()

query.awaitTermination()

Screen Shot 2020-11-19 at 6 56 47 PM

@HeartSaVioR
Copy link
Contributor Author

One thing to think about: should we automatically scale the unit for watermark gap? I just picked seconds which doesn't look too small and too big, but if input event time is delayed by hours it's going to be a bit huge. (It's definitely not a good signal, though.)

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35952/

@gaborgsomogyi
Copy link
Contributor

I've just had a slight look at this and I don't think switching is needed. We're not doing this where bytes showed. If I would be a user and see a graph and all of a sudden I see a different axis meaning I would be confused.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, not yet tested it manually so this still to come.

query: StreamingQueryUIData,
minBatchTime: Long,
maxBatchTime: Long,
jsCollector: JsCollector): NodeBuffer = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now sure what complications it would mean in generateStatTable but I think we can give back Node here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I simply copied and pasted, and struggled why it requires multiple nodes (hence &+). My bad.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Nov 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Seq[Node] as I don't see a way to instantiate empty Node.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Nov 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing to Option[Node] make us to force wrapping <tr>...</tr> to Option, whereas leaving the return type to NodeBuffer or Seq[Node] don't. (scala.xml.Node looks to have interesting implementation - Node extends NodeSeq)

I think either NodeBuffer or Seq[Node] is simpler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see and agree doesn't worth the hassle.

<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Global Watermark Gap {SparkUIUtils.tooltip("The gap between timestamp and global watermark for the batch.", "right")}</strong></div>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that timestamp here means now but maybe we can more explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Probably better to say batch timestamp explicitly.

@@ -51,6 +53,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
val conf = new SparkConf()
.setMaster(master)
.setAppName("ui-test")
.set(SHUFFLE_PARTITIONS, 5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, is this to speed up the unit test not to start 200 tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Once I changed the active query a bit to have watermark being set, it suffered to make progress in 30 seconds (meaning checking the UI failed as there's no queryProgress) and failed. This fixed the issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has similar problem before, just wanted to double check. Thanks!

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35952/

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. Thank you, @HeartSaVioR .

For your design choice, do we support automatically scale the unit in the other graph? Otherwise, I agree with your decision (sec).

One thing to think about: should we automatically scale the unit for watermark gap?

@dongjoon-hyun
Copy link
Member

cc @viirya

@gaborgsomogyi
Copy link
Contributor

I've double checked the graphs manually and it works fine.

@gaborgsomogyi
Copy link
Contributor

do we support automatically scale the unit in the other graph

No

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Test build #131348 has finished for PR 30427 at commit d82702a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Nov 19, 2020

AFAIK we don't support auto scale for other graphs. That sounds like an improvement, but I'm not a FE engineer and we even don't seem to rely on graph library (which may provide rich functionalities) and implement our own, hence harder to make an improvement.

The value can goes up very high if you set the additional delay of watermark to a couple of hours or even more (2 hours = 172,800 seconds). If the difference of watermark gap among batches are tiny compared to the additional delay, the graph will just keep showing "nearly" horizontal line. While scaling unit would make us confused, adjusting min/max of y axis might be helpful. I'm just hesitate to make change as I see all existing graphs have 0 as min value of y axis, though.

@HyukjinKwon
Copy link
Member

cc @xuanyuanking too FYI

@dongjoon-hyun
Copy link
Member

Thank you for the confirmation, @gaborgsomogyi and @HeartSaVioR !

@HeartSaVioR
Copy link
Contributor Author

cc. @tdas @zsxwing @jose-torres @sarutak as well

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35983/

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35983/

@viirya
Copy link
Member

viirya commented Nov 20, 2020

Watermark is the one of major metrics the end users need to track for stateful queries. Watermark defines "when" the output will be emitted for append mode, hence knowing how much gap between wall clock and watermark (input data) is very helpful to make expectation of the output.

Hmm, my question is, watermark should be derived from event time instead of processing time (I think it should be wall clock here?). In the examples, looks like the event time is as processing time, IIUC. So once the event time from data is different processing time, is this graph still useful?

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Nov 20, 2020

The complicated case in manual test demonstrates the use case of "event time processing". Please take a look at the code how I randomize the event timestamp in input rows.

Technically, the graph is almost meaningless on processing time, because the event timestamp would be nearly same as batch timestamp. Even the query is lagging, once the next batch is launched, the event timestamp of inputs will be matched to the batch timestamp.

The graph will be helpful if they're either using "ingest time" (not timestamped by Spark, but timestamped when ingested to the input storage) which could show the lag of process, or using "event time" which is the best case of showing the gap.

Figure_05_-_Event_Time_vs_Processing_Time

The figure is borrowed from the gold articles below. If you haven't read below articles, strongly recommend to read them, or read the book "Streaming Systems".

https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/
https://www.oreilly.com/radar/the-world-beyond-batch-streaming-102/

@viirya
Copy link
Member

viirya commented Nov 20, 2020

The complicated case in manual test demonstrates the use case of "event time processing". Please take a look at the code how I randomize the event timestamp in input rows.

Do I miss anything? The two code is the same.

@HeartSaVioR
Copy link
Contributor Author

Sorry, copy & paste error. Just updated.

@viirya
Copy link
Member

viirya commented Nov 20, 2020

Technically, the graph is almost meaningless on processing time, because the event timestamp would be nearly same as batch timestamp. Even the query is lagging, once the next batch is launched, the event timestamp of inputs will be matched to the batch timestamp.

The graph will be helpful if they're either using "ingest time" (not timestamped by Spark, but timestamped when ingested to the input storage) which could show the lag of process, or using "event time" which is the best case of showing the gap.

The gap is calculated by the difference between batch timestamp (this should be processing time, right? Because the trigger clock is SystemClock by default) and watermark. My previous question maybe not clear. If we process history data or some simulation data, the event time could be far different to processing time. For example, if we process some data from 2010 to 2019, now the gap is current time - 2010-xx-xx...?

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Nov 20, 2020

If we process history data or some simulation data, the event time could be far different to processing time. For example, if we process some data from 2010 to 2019, now the gap is current time - 2010-xx-xx...?

You understand it correctly, though that's just a one of use cases. Given they are running "streaming workload", one of the main goals is to capture the recent outputs (e.g. trends). Watermark would still work for such historical use cases as well, but what to plot to provide values even on the situation remains the question. (What would be the "ideal" timestamp to calculate the gap in this case?)

EDIT: for that case, adjusting range on y axis would probably help, otherwise we only see the "line" plotted nearly linear like what I commented above in #30427 (comment).

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Test build #131380 has finished for PR 30427 at commit 2f1081a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

</div>
</td>
<td class="watermark-gap-timeline">{graphUIDataForWatermark.generateTimelineHtml(jsCollector)}</td>
<td class="watermark-gap-timeline">{graphUIDataForWatermark.generateHistogramHtml(jsCollector)}</td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

watermark-gap-histogram?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. Thanks for finding!

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Test build #131405 has finished for PR 30427 at commit d19fd10.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 20, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36009/

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😂 Sorry for the latest minute comment again(missed the ping...). I'm also OK to address/discuss the comment in a follow-up if it ready to go and this PR does address the general cases of SS. Post my LGTM.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Nov 23, 2020

Test build #131571 has started for PR 30427 at commit d19fd10.

@SparkQA
Copy link

SparkQA commented Nov 23, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36172/

@SparkQA
Copy link

SparkQA commented Nov 23, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36172/

@SparkQA
Copy link

SparkQA commented Nov 24, 2020

Test build #131596 has finished for PR 30427 at commit d19fd10.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sarutak
Copy link
Member

sarutak commented Nov 24, 2020

retest this please.

@SparkQA
Copy link

SparkQA commented Nov 24, 2020

Test build #131622 has finished for PR 30427 at commit d19fd10.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sarutak
Copy link
Member

sarutak commented Nov 24, 2020

retest this please.

@SparkQA
Copy link

SparkQA commented Nov 24, 2020

Test build #131630 has finished for PR 30427 at commit d19fd10.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 24, 2020

Test build #131649 has finished for PR 30427 at commit d19fd10.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Nov 24, 2020

Jenkins seems unstable. GA was passed actually. I think it should be okay.

@dongjoon-hyun
Copy link
Member

Yep. The master branch is fixed via 048a982 .

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Nov 24, 2020

Test build #131692 has finished for PR 30427 at commit d19fd10.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Just rebased. I'll merge either Github Action or Jenkins is happy with the change.

@sarutak sarutak changed the title [SPARK-33224][SS] Add watermark gap information into SS UI page [SPARK-33224][SS][WEBUI] Add watermark gap information into SS UI page Nov 24, 2020
@SparkQA
Copy link

SparkQA commented Nov 25, 2020

Test build #131704 has finished for PR 30427 at commit a6db726.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing! Merged to master.

@HeartSaVioR HeartSaVioR deleted the SPARK-33224 branch November 25, 2020 04:13
HeartSaVioR pushed a commit that referenced this pull request Dec 1, 2020
…d if built with Scala 2.13

### What changes were proposed in this pull request?

This PR fixes an issue that the histogram and timeline aren't rendered in the `Streaming Query Statistics` page if we built Spark with Scala 2.13.

![before-fix-the-issue](https://user-images.githubusercontent.com/4736016/100612855-f543d700-3356-11eb-90d9-ede57b8b3f4f.png)
![NaN_Error](https://user-images.githubusercontent.com/4736016/100612879-00970280-3357-11eb-97cf-43978bbe2d3a.png)

The reason is [`maxRecordRate` can be `NaN`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala#L371) for Scala 2.13.

The `NaN` is the result of [`query.recentProgress.map(_.inputRowsPerSecond).max`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala#L372) when the first element of `query.recentProgress.map(_.inputRowsPerSecond)` is `NaN`.
Actually, the comparison logic for `Double` type was changed in Scala 2.13.
scala/bug#12107
scala/scala#6410

So this issue happens as of Scala 2.13.

The root cause of the `NaN` is [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala#L164).
This `NaN` seems to be an initial value of `inputTimeSec` so I think `Double.PositiveInfinity` is suitable rather than `NaN` and this change can resolve this issue.

### Why are the changes needed?

To make sure we can use the histogram/timeline with Scala 2.13.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

First, I built with the following commands.
```
$ /dev/change-scala-version.sh 2.13
$ build/sbt -Phive -Phive-thriftserver -Pscala-2.13 package
```

Then, ran the following query (this is brought from #30427 ).
```
import org.apache.spark.sql.streaming.Trigger
val query = spark
  .readStream
  .format("rate")
  .option("rowsPerSecond", 1000)
  .option("rampUpTime", "10s")
  .load()
  .selectExpr("*", "CAST(CAST(timestamp AS BIGINT) - CAST((RAND() * 100000) AS BIGINT) AS TIMESTAMP) AS tsMod")
  .selectExpr("tsMod", "mod(value, 100) as mod", "value")
  .withWatermark("tsMod", "10 seconds")
  .groupBy(window($"tsMod", "1 minute", "10 seconds"), $"mod")
  .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value"))
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .outputMode("append")
  .start()
```

Finally, I confirmed that the timeline and histogram are rendered.
![after-fix-the-issue](https://user-images.githubusercontent.com/4736016/100612736-c9285600-3356-11eb-856d-7e53cc656c36.png)

```

Closes #30546 from sarutak/ss-nan.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants