Skip to content

Commit a13ff34

Browse files
committed
address comments
1 parent 8466916 commit a13ff34

File tree

5 files changed

+69
-42
lines changed

5 files changed

+69
-42
lines changed

examples/src/main/python/streaming/hdfs_wordcount.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,42 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
Counts words in new text files created in the given directory
20+
Usage: hdfs_wordcount.py <directory>
21+
<directory> is the directory that Spark Streaming will use to find and read new text files.
22+
23+
To run this on your local machine on directory `localdir`, run this example
24+
$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localdir
25+
26+
Then create a text file in `localdir` and the words in the file will get counted.
27+
"""
28+
129
import sys
230

331
from pyspark import SparkContext
432
from pyspark.streaming import StreamingContext
533

634
if __name__ == "__main__":
735
if len(sys.argv) != 2:
8-
print >> sys.stderr, "Usage: wordcount <directory>"
36+
print >> sys.stderr, "Usage: hdfs_wordcount.py <directory>"
937
exit(-1)
1038

11-
sc = SparkContext(appName="PythonStreamingWordCount")
39+
sc = SparkContext(appName="PythonStreamingHDFSWordCount")
1240
ssc = StreamingContext(sc, 1)
1341

1442
lines = ssc.textFileStream(sys.argv[1])

examples/src/main/python/streaming/network_wordcount.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,39 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
20+
Usage: network_wordcount.py <hostname> <port>
21+
<hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
22+
23+
To run this on your local machine, you need to first run a Netcat server
24+
`$ nc -lk 9999`
25+
and then run the example
26+
`$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999`
27+
"""
28+
129
import sys
230

331
from pyspark import SparkContext
432
from pyspark.streaming import StreamingContext
533

634
if __name__ == "__main__":
735
if len(sys.argv) != 3:
8-
print >> sys.stderr, "Usage: wordcount <hostname> <port>"
36+
print >> sys.stderr, "Usage: network_wordcount.py <hostname> <port>"
937
exit(-1)
1038
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
1139
ssc = StreamingContext(sc, 1)

python/pyspark/streaming/context.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -234,11 +234,11 @@ def transform(self, dstreams, transformFunc):
234234
jdstreams = ListConverter().convert([d._jdstream for d in dstreams],
235235
SparkContext._gateway._gateway_client)
236236
# change the final serializer to sc.serializer
237-
jfunc = RDDFunction(self._sc,
238-
lambda t, *rdds: transformFunc(rdds).map(lambda x: x),
239-
*[d._jrdd_deserializer for d in dstreams])
240-
241-
jdstream = self._jvm.PythonDStream.callTransform(self._jssc, jdstreams, jfunc)
237+
func = RDDFunction(self._sc,
238+
lambda t, *rdds: transformFunc(rdds).map(lambda x: x),
239+
*[d._jrdd_deserializer for d in dstreams])
240+
jfunc = self._jvm.RDDFunction(func)
241+
jdstream = self._jssc.transform(jdstreams, jfunc)
242242
return DStream(jdstream, self, self._sc.serializer)
243243

244244
def union(self, *dstreams):

python/pyspark/streaming/dstream.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,6 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash):
150150
"""
151151
return self.transform(lambda rdd: rdd.partitionBy(numPartitions, partitionFunc))
152152

153-
# def foreach(self, func):
154-
# return self.foreachRDD(lambda _, rdd: rdd.foreach(func))
155-
156153
def foreachRDD(self, func):
157154
"""
158155
Apply a function to each RDD in this DStream.

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ private[python] class RDDFunction(@transient var pfunc: PythonRDDFunction)
4747
extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable {
4848

4949
def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
50-
PythonDStream.some(pfunc.call(time.milliseconds, List(PythonDStream.wrapRDD(rdd)).asJava))
50+
Option(pfunc.call(time.milliseconds, List(rdd.map(JavaRDD.fromRDD(_)).orNull).asJava)).map(_.rdd)
5151
}
5252

5353
def apply(rdd: Option[RDD[_]], rdd2: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
54-
val rdds = List(PythonDStream.wrapRDD(rdd), PythonDStream.wrapRDD(rdd2)).asJava
55-
PythonDStream.some(pfunc.call(time.milliseconds, rdds))
54+
val rdds = List(rdd.map(JavaRDD.fromRDD(_)).orNull, rdd2.map(JavaRDD.fromRDD(_)).orNull).asJava
55+
Option(pfunc.call(time.milliseconds, rdds)).map(_.rdd)
5656
}
5757

5858
// for function.Function2
@@ -115,39 +115,13 @@ private[python] object PythonDStream {
115115
serializer = new RDDFunctionSerializer(ser)
116116
}
117117

118-
// convert Option[RDD[_]] to JavaRDD, handle null gracefully
119-
def wrapRDD(rdd: Option[RDD[_]]): JavaRDD[_] = {
120-
if (rdd.isDefined) {
121-
JavaRDD.fromRDD(rdd.get)
122-
} else {
123-
null
124-
}
125-
}
126-
127-
// convert JavaRDD to Option[RDD[Array[Byte]]] to , handle null gracefully
128-
def some(jrdd: JavaRDD[Array[Byte]]): Option[RDD[Array[Byte]]] = {
129-
if (jrdd != null) {
130-
Some(jrdd.rdd)
131-
} else {
132-
None
133-
}
134-
}
135-
136118
// helper function for DStream.foreachRDD(),
137119
// cannot be `foreachRDD`, it will confusing py4j
138-
def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pfunc: PythonRDDFunction){
120+
def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pfunc: PythonRDDFunction) {
139121
val func = new RDDFunction((pfunc))
140122
jdstream.dstream.foreachRDD((rdd, time) => func(Some(rdd), time))
141123
}
142124

143-
// helper function for ssc.transform()
144-
def callTransform(ssc: JavaStreamingContext, jdsteams: JList[JavaDStream[_]],
145-
pyfunc: PythonRDDFunction)
146-
:JavaDStream[Array[Byte]] = {
147-
val func = new RDDFunction(pyfunc)
148-
ssc.transform(jdsteams, func)
149-
}
150-
151125
// convert list of RDD into queue of RDDs, for ssc.queueStream()
152126
def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]]): java.util.Queue[JavaRDD[Array[Byte]]] = {
153127
val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]]
@@ -232,7 +206,7 @@ class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_],
232206
func(parent.getOrCompute(validTime), parent2.getOrCompute(validTime), validTime)
233207
}
234208

235-
val asJavaDStream = JavaDStream.fromDStream(this)
209+
val asJavaDStream = JavaDStream.fromDStream(this)
236210
}
237211

238212
/**

0 commit comments

Comments
 (0)