File tree Expand file tree Collapse file tree 2 files changed +8
-1
lines changed
examples/src/main/python/streaming Expand file tree Collapse file tree 2 files changed +8
-1
lines changed Original file line number Diff line number Diff line change 1
1
import sys
2
2
from operator import add
3
3
4
+ from pyspark .conf import SparkConf
4
5
from pyspark .streaming .context import StreamingContext
5
6
from pyspark .streaming .duration import *
6
7
7
8
if __name__ == "__main__" :
8
9
if len (sys .argv ) != 3 :
9
10
print >> sys .stderr , "Usage: wordcount <hostname> <port>"
10
11
exit (- 1 )
11
- ssc = StreamingContext (appName = "PythonStreamingNetworkWordCount" , duration = Seconds (1 ))
12
+ conf = SparkConf ()
13
+ conf .setAppName ("PythonStreamingNetworkWordCount" )
14
+ conf .set ("spark.default.parallelism" , 1 )
15
+ ssc = StreamingContext (conf = conf , duration = Seconds (1 ))
12
16
13
17
lines = ssc .socketTextStream (sys .argv [1 ], int (sys .argv [2 ]))
14
18
fm_lines = lines .flatMap (lambda x : x .split (" " ))
15
19
filtered_lines = fm_lines .filter (lambda line : "Spark" in line )
16
20
mapped_lines = fm_lines .map (lambda x : (x , 1 ))
21
+ reduced_lines = mapped_lines .reduce (add )
17
22
18
23
fm_lines .pyprint ()
19
24
filtered_lines .pyprint ()
20
25
mapped_lines .pyprint ()
26
+ reduced_lines .pyprint ()
21
27
ssc .start ()
22
28
ssc .awaitTermination ()
Original file line number Diff line number Diff line change 13
13
conf .setAppName ("PythonStreamingWordCount" )
14
14
conf .set ("spark.default.parallelism" , 1 )
15
15
16
+ # still has a bug
16
17
# ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
17
18
ssc = StreamingContext (conf = conf , duration = Seconds (1 ))
18
19
You can’t perform that action at this time.
0 commit comments