Skip to content

Commit a8c9fd5

Browse files
Ken TakagiwaKen Takagiwa
Ken Takagiwa
authored and
Ken Takagiwa
committed
fixed for socketTextStream
1 parent 1e84f41 commit a8c9fd5

File tree

3 files changed

+6
-22
lines changed

3 files changed

+6
-22
lines changed

examples/src/main/python/streaming/nerwork_wordcount.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
exit(-1)
1111
ssc = StreamingContext(appName="PythonStreamingNetworkWordCount", duration=Seconds(1))
1212

13-
lines = ssc.socketTextStream(sys.argv[1], sys.argv[2])
13+
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
1414
fm_lines = lines.flatMap(lambda x: x.split(" "))
1515
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
1616
mapped_lines = fm_lines.map(lambda x: (x, 1))

python/pyspark/java_gateway.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ def run(self):
8585
java_import(gateway.jvm, "org.apache.spark.streaming.*")
8686
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
8787
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
88+
java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*")
8889
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
8990
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
9091
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")

python/pyspark/streaming/context.py

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from pyspark.files import SparkFiles
2020
from pyspark.java_gateway import launch_gateway
2121
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
22-
from pyspark.storagelevel import StorageLevel
22+
from pyspark.storagelevel import *
2323
from pyspark.rdd import RDD
2424
from pyspark.context import SparkContext
2525

@@ -83,26 +83,9 @@ def awaitTermination(self, timeout=None):
8383
else:
8484
self._jssc.awaitTermination()
8585

86-
def checkpoint(self, directory):
87-
raise NotImplementedError
88-
89-
def fileStream(self, directory, filter=None, newFilesOnly=None):
90-
raise NotImplementedError
91-
92-
def networkStream(self, receiver):
93-
raise NotImplementedError
94-
95-
def queueStream(self, queue, oneAtATime=True, defaultRDD=None):
96-
raise NotImplementedError
97-
98-
def rawSocketStream(self, hostname, port, storagelevel):
99-
raise NotImplementedError
100-
101-
def remember(self, duration):
102-
raise NotImplementedError
103-
104-
def socketStream(hostname, port, converter,storageLevel):
105-
raise NotImplementedError
86+
# start from simple one. storageLevel is not passed for now.
87+
def socketTextStream(self, hostname, port):
88+
return DStream(self._jssc.socketTextStream(hostname, port), self, UTF8Deserializer())
10689

10790
def start(self):
10891
self._jssc.start()

0 commit comments

Comments
 (0)