forked from drabastomek/learningPySpark
-
Notifications
You must be signed in to change notification settings - Fork 0
/
structured_streaming_word_count.py
49 lines (40 loc) · 1.19 KB
/
structured_streaming_word_count.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#
# Structured Streaming Word Count Example
# Original Source: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
#
# To run this example:
# Terminal 1: nc -lk 9999
# Terminal 2: ./bin/spark-submit structured_streaming_word_count.py localhost 9999
# Note, type words into Terminal 1
#
# Import the necessary classes and create a local SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark\
.readStream\
.format('socket')\
.option('host', 'localhost')\
.option('port', 9999)\
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, ' ')
).alias('word')
)
# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the console
query = wordCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.start()
# Await Spark Streaming termination
query.awaitTermination()