Skip to content

Commit

Permalink
[MINOR] Page rename (Demo: Streaming Windowed Aggregation)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Dec 22, 2022
1 parent fbb60d7 commit 69f1eed
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 51 deletions.
2 changes: 1 addition & 1 deletion docs/demo/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ hide:
* [groupByKey Streaming Aggregation in Update Mode](groupByKey-count-Update.md)
* [Internals of FlatMapGroupsWithStateExec Physical Operator](spark-sql-streaming-demo-FlatMapGroupsWithStateExec.md)
* [Kafka Data Source](kafka-data-source.md)
* [Streaming Aggregation](streaming-aggregation.md)
* [Streaming Windowed Aggregation](streaming-windowed-aggregation.md)
* [RocksDB State Store for Streaming Aggregation](rocksdb-state-store-for-streaming-aggregation.md)
* [Streaming Query for Running Counts (Socket Source and Complete Output Mode)](groupBy-running-count-complete.md)
* [Streaming Watermark](streaming-watermark.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,20 @@ hide:
- navigation
---

# Demo: Streaming Aggregation
# Demo: Streaming Windowed Aggregation

This demo shows a streaming query with a [streaming aggregation](../streaming-aggregation/index.md) (with [Dataset.groupBy](../operators/groupBy.md) operator) that processes data from Kafka (using [Kafka Data Source](../kafka/index.md)).

!!! note
Please start a [Kafka cluster](kafka-data-source.md#start-kafka-cluster) and [spark-shell](kafka-data-source.md#start-spark-shell) as described in [Demo: Kafka Data Source](kafka-data-source.md).

## Reset numShufflePartitions

This step makes debugging easier since there is a state (store) for just one partition.

```scala
val numShufflePartitions = 1
import org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS
spark.sessionState.conf.setConf(SHUFFLE_PARTITIONS, numShufflePartitions)

assert(spark.sessionState.conf.numShufflePartitions == numShufflePartitions)
```
The source code of this demo is in [spark-examples](https://github.com/jaceklaskowski/spark-examples) repository.

## Load Events from Kafka

```scala
val events = spark
.readStream
.format("kafka")
.option("subscribe", "demo.streaming-aggregation")
.option("kafka.bootstrap.servers", ":9092")
.load
.select($"value" cast "string")
.withColumn("tokens", split($"value", ","))
.withColumn("id", 'tokens(0))
.withColumn("v", 'tokens(1) cast "int")
.withColumn("second", 'tokens(2) cast "long")
.withColumn("event_time", 'second cast "timestamp") // <-- Event time has to be a timestamp
.select("id", "v", "second", "event_time")
```

??? note "FIXME Consider JSON format for values"
JSONified values would make more sense. It'd certainly make the demo more verbose (extra JSON-specific "things") but perhaps would ease building a connection between events on the command line and their DataFrame representation.
!!! note "Demo: Kafka Data Source"
Please start a [Kafka cluster](kafka-data-source.md#start-kafka-cluster) and [spark-shell](kafka-data-source.md#start-spark-shell) as described in [Demo: Kafka Data Source](kafka-data-source.md).

## Define Windowed Streaming Aggregation

!!! note "Windowed Aggregation"
This is different from the source code in spark-examples repo as it uses windowed aggregation.

Define a streaming aggregation query (using [groupBy](../operators/groupBy.md) high-level operator).

The streaming query uses [Append](../OutputMode.md#Append) output mode and defines a [streaming watermark](../watermark/index.md) (using [Dataset.withWatermark](../operators/withWatermark.md) operator). Otherwise, [UnsupportedOperationChecker](../UnsupportedOperationChecker.md) would fail the query (since a watermark is required for `Append` output mode in a streaming aggregation).
Expand Down Expand Up @@ -113,6 +85,9 @@ ObjectHashAggregate(keys=[id#26, window#66-T10000ms], functions=[collect_list(v#

## Start Streaming Query

!!! note "Append Output Mode"
This is different from the source code in spark-examples repo as it uses [Append](../OutputMode.md#Append) output mode.

```scala
import java.time.Clock
val timeOffset = Clock.systemUTC.instant.getEpochSecond
Expand All @@ -133,19 +108,7 @@ val sq = windowed
.start
```

The streaming query gets executed and prints out Batch 0 to the console.

```text
-------------------------------------------
Batch: 0
-------------------------------------------
+---+------+---+-------+
|id |window|vs |seconds|
+---+------+---+-------+
+---+------+---+-------+
```

### Start Diagnostic Query
### (Optional) Start Diagnostic Query

```scala
import java.time.Clock
Expand Down
2 changes: 1 addition & 1 deletion docs/webui/StreamingQueryStatisticsPage.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

`StreamingQueryStatisticsPage` uses `id` request parameter as the `runId` of a streaming query to [render a page](#render) of.
!!! tip "Demo: Streaming Aggregation"
Use [Demo: Streaming Aggregation](../demo/streaming-aggregation.md) to learn more and monitor the statistics.
Use [Demo: Streaming Aggregation](../demo/streaming-windowed-aggregation.md) to learn more and monitor the statistics.

## Creating Instance

Expand Down
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ nav:
- groupByKey Streaming Aggregation in Update Mode: demo/groupByKey-count-Update.md
- Internals of FlatMapGroupsWithStateExec Physical Operator: demo/spark-sql-streaming-demo-FlatMapGroupsWithStateExec.md
- Kafka Data Source: demo/kafka-data-source.md
- Streaming Aggregation: demo/streaming-aggregation.md
- Streaming Windowed Aggregation: demo/streaming-windowed-aggregation.md
- RocksDB State Store for Streaming Aggregation: demo/rocksdb-state-store-for-streaming-aggregation.md
- Streaming Query for Running Counts (Socket Source and Complete Output Mode): demo/groupBy-running-count-complete.md
- Streaming Watermark: demo/streaming-watermark.md
Expand Down

0 comments on commit 69f1eed

Please sign in to comment.