File tree Expand file tree Collapse file tree 2 files changed +8
-17
lines changed
examples/src/main/python/streaming Expand file tree Collapse file tree 2 files changed +8
-17
lines changed Original file line number Diff line number Diff line change 14
14
ssc = StreamingContext (conf = conf , duration = Seconds (1 ))
15
15
16
16
lines = ssc .socketTextStream (sys .argv [1 ], int (sys .argv [2 ]))
17
- fm_lines = lines .flatMap (lambda x : x .split (" " ))
18
- mapped_lines = fm_lines .map (lambda x : (x , 1 ))
19
- reduced_lines = mapped_lines .reduceByKey (add )
17
+ words = lines .flatMap (lambda line : line .split (" " ))
18
+ mapped_words = words .map (lambda word : (word , 1 ))
19
+ count = mapped_words .reduceByKey (add )
20
20
21
- reduced_lines .pyprint ()
22
- count_lines = mapped_lines .count ()
23
- count_lines .pyprint ()
21
+ count .pyprint ()
24
22
ssc .start ()
25
23
ssc .awaitTermination ()
Original file line number Diff line number Diff line change 11
11
exit (- 1 )
12
12
conf = SparkConf ()
13
13
conf .setAppName ("PythonStreamingWordCount" )
14
- conf .set ("spark.default.parallelism" , 1 )
15
14
16
- # still has a bug
17
- # ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
18
15
ssc = StreamingContext (conf = conf , duration = Seconds (1 ))
19
16
20
17
lines = ssc .textFileStream (sys .argv [1 ])
21
- fm_lines = lines .flatMap (lambda x : x .split (" " ))
22
- filtered_lines = fm_lines .filter (lambda line : "Spark" in line )
23
- mapped_lines = fm_lines .map (lambda x : (x , 1 ))
24
- reduced_lines = mapped_lines .reduceByKey (add )
18
+ words = lines .flatMap (lambda line : line .split (" " ))
19
+ mapped_words = words .map (lambda x : (x , 1 ))
20
+ count = mapped_words .reduceByKey (add )
25
21
26
- fm_lines .pyprint ()
27
- filtered_lines .pyprint ()
28
- mapped_lines .pyprint ()
29
- reduced_lines .pyprint ()
22
+ count .pyprint ()
30
23
ssc .start ()
31
24
ssc .awaitTermination ()
You can’t perform that action at this time.
0 commit comments