Skip to content

Commit bebeb4a

Browse files
committed
address all comments
1 parent 6db00da commit bebeb4a

File tree

3 files changed

+28
-8
lines changed

3 files changed

+28
-8
lines changed

python/pyspark/streaming/dstream.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,15 @@ def takeAndPrint(time, rdd):
174174
def mapValues(self, f):
175175
"""
176176
Return a new DStream by applying a map function to the value of
177-
each key-value pairs in 'this' DStream without changing the key.
177+
each key-value pairs in this DStream without changing the key.
178178
"""
179179
map_values_fn = lambda (k, v): (k, f(v))
180180
return self.map(map_values_fn, preservesPartitioning=True)
181181

182182
def flatMapValues(self, f):
183183
"""
184184
Return a new DStream by applying a flatmap function to the value
185-
of each key-value pairs in 'this' DStream without changing the key.
185+
of each key-value pairs in this DStream without changing the key.
186186
"""
187187
flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
188188
return self.flatMap(flat_map_fn, preservesPartitioning=True)
@@ -276,7 +276,7 @@ def saveAsTextFile(t, rdd):
276276
def transform(self, func):
277277
"""
278278
Return a new DStream in which each RDD is generated by applying a function
279-
on each RDD of 'this' DStream.
279+
on each RDD of this DStream.
280280
281281
`func` can have one argument of `rdd`, or have two arguments of
282282
(`time`, `rdd`)
@@ -290,7 +290,7 @@ def transform(self, func):
290290
def transformWith(self, func, other, keepSerializer=False):
291291
"""
292292
Return a new DStream in which each RDD is generated by applying a function
293-
on each RDD of 'this' DStream and 'other' DStream.
293+
on each RDD of this DStream and 'other' DStream.
294294
295295
`func` can have two arguments of (`rdd_a`, `rdd_b`) or have three
296296
arguments of (`time`, `rdd_a`, `rdd_b`)

python/pyspark/streaming/util.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@
2424

2525
class TransformFunction(object):
2626
"""
27-
This class is for py4j callback.
27+
This class wraps a function RDD[X] -> RDD[Y] that was passed to
28+
DStream.transform(), allowing it to be called from Java via Py4J's
29+
callback server.
30+
31+
Java calls this function with a sequence of JavaRDDs and this function
32+
returns a single JavaRDD pointer back to Java.
2833
"""
2934
_emptyRDD = None
3035

@@ -63,6 +68,16 @@ class Java:
6368

6469

6570
class TransformFunctionSerializer(object):
71+
"""
72+
This class implements a serializer for PythonTransformFunction Java
73+
objects.
74+
75+
This is necessary because the Java PythonTransformFunction objects are
76+
actually Py4J references to Python objects and thus are not directly
77+
serializable. When Java needs to serialize a PythonTransformFunction,
78+
it uses this class to invoke Python, which returns the serialized function
79+
as a byte array.
80+
"""
6681
def __init__(self, ctx, serializer, gateway=None):
6782
self.ctx = ctx
6883
self.serializer = serializer

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,12 @@ private[python] trait PythonTransformFunctionSerializer {
5151
}
5252

5353
/**
54-
* Wrapper for PythonTransformFunction
54+
* Wraps a PythonTransformFunction (which is a Python object accessed through Py4J)
55+
* so that it looks like a Scala function and can be transparently serialized and
56+
* deserialized by Java.
5557
*/
5658
private[python] class TransformFunction(@transient var pfunc: PythonTransformFunction)
57-
extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable {
59+
extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] {
5860

5961
def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
6062
Option(pfunc.call(time.milliseconds, List(rdd.map(JavaRDD.fromRDD(_)).orNull).asJava))
@@ -87,6 +89,9 @@ private[python] class TransformFunction(@transient var pfunc: PythonTransformFun
8789

8890
/**
8991
* Helpers for PythonTransformFunctionSerializer
92+
*
93+
* PythonTransformFunctionSerializer is logically a singleton that's happens to be
94+
* implemented as a Python object.
9095
*/
9196
private[python] object PythonTransformFunctionSerializer {
9297

@@ -119,7 +124,7 @@ private[python] object PythonTransformFunctionSerializer {
119124
}
120125

121126
/**
122-
* Helper functions
127+
* Helper functions, which are called from Python via Py4J.
123128
*/
124129
private[python] object PythonDStream {
125130

0 commit comments

Comments
 (0)