File tree Expand file tree Collapse file tree 1 file changed +22
-0
lines changed
examples/src/main/python/streaming Expand file tree Collapse file tree 1 file changed +22
-0
lines changed Original file line number Diff line number Diff line change
1
+ import sys
2
+ from operator import add
3
+
4
+ from pyspark .streaming .context import StreamingContext
5
+ from pyspark .streaming .duration import *
6
+
7
+ if __name__ == "__main__" :
8
+ if len (sys .argv ) != 3 :
9
+ print >> sys .stderr , "Usage: wordcount <hostname> <port>"
10
+ exit (- 1 )
11
+ ssc = StreamingContext (appName = "PythonStreamingNetworkWordCount" , duration = Seconds (1 ))
12
+
13
+ lines = ssc .socketTextStream (sys .argv [1 ], int (sys .argv [2 ]))
14
+ fm_lines = lines .flatMap (lambda x : x .split (" " ))
15
+ filtered_lines = fm_lines .filter (lambda line : "Spark" in line )
16
+ mapped_lines = fm_lines .map (lambda x : (x , 1 ))
17
+
18
+ fm_lines .pyprint ()
19
+ filtered_lines .pyprint ()
20
+ mapped_lines .pyprint ()
21
+ ssc .start ()
22
+ ssc .awaitTermination ()
You can’t perform that action at this time.
0 commit comments