Skip to content

Commit ff88bec

Browse files
committed
rename RDDFunction to TransformFunction
1 parent d328aca commit ff88bec

File tree

4 files changed

+49
-49
lines changed

4 files changed

+49
-49
lines changed

python/pyspark/streaming/context.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from pyspark.context import SparkContext
2626
from pyspark.storagelevel import StorageLevel
2727
from pyspark.streaming.dstream import DStream
28-
from pyspark.streaming.util import RDDFunction, RDDFunctionSerializer
28+
from pyspark.streaming.util import TransformFunction, TransformFunctionSerializer
2929

3030
__all__ = ["StreamingContext"]
3131

@@ -114,10 +114,10 @@ def _ensure_initialized(cls):
114114
java_import(gw.jvm, "org.apache.spark.streaming.*")
115115
java_import(gw.jvm, "org.apache.spark.streaming.api.java.*")
116116
java_import(gw.jvm, "org.apache.spark.streaming.api.python.*")
117-
# register serializer for RDDFunction
117+
# register serializer for TransformFunction
118118
# it happens before creating SparkContext when loading from checkpointing
119-
cls._transformerSerializer = RDDFunctionSerializer(SparkContext._active_spark_context,
120-
CloudPickleSerializer(), gw)
119+
cls._transformerSerializer = TransformFunctionSerializer(
120+
SparkContext._active_spark_context, CloudPickleSerializer(), gw)
121121
gw.jvm.PythonDStream.registerSerializer(cls._transformerSerializer)
122122

123123
@classmethod
@@ -284,10 +284,10 @@ def transform(self, dstreams, transformFunc):
284284
jdstreams = ListConverter().convert([d._jdstream for d in dstreams],
285285
SparkContext._gateway._gateway_client)
286286
# change the final serializer to sc.serializer
287-
func = RDDFunction(self._sc,
288-
lambda t, *rdds: transformFunc(rdds).map(lambda x: x),
289-
*[d._jrdd_deserializer for d in dstreams])
290-
jfunc = self._jvm.RDDFunction(func)
287+
func = TransformFunction(self._sc,
288+
lambda t, *rdds: transformFunc(rdds).map(lambda x: x),
289+
*[d._jrdd_deserializer for d in dstreams])
290+
jfunc = self._jvm.TransformFunction(func)
291291
jdstream = self._jssc.transform(jdstreams, jfunc)
292292
return DStream(jdstream, self, self._sc.serializer)
293293

python/pyspark/streaming/dstream.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
from pyspark import RDD
2424
from pyspark.storagelevel import StorageLevel
25-
from pyspark.streaming.util import rddToFileName, RDDFunction
25+
from pyspark.streaming.util import rddToFileName, TransformFunction
2626
from pyspark.rdd import portable_hash
2727
from pyspark.resultiterable import ResultIterable
2828

@@ -154,7 +154,7 @@ def foreachRDD(self, func):
154154
"""
155155
Apply a function to each RDD in this DStream.
156156
"""
157-
jfunc = RDDFunction(self.ctx, func, self._jrdd_deserializer)
157+
jfunc = TransformFunction(self.ctx, func, self._jrdd_deserializer)
158158
api = self._ssc._jvm.PythonDStream
159159
api.callForeachRDD(self._jdstream, jfunc)
160160

@@ -292,7 +292,7 @@ def transformWith(self, func, other, keepSerializer=False):
292292
oldfunc = func
293293
func = lambda t, a, b: oldfunc(a, b)
294294
assert func.func_code.co_argcount == 3, "func should take two or three arguments"
295-
jfunc = RDDFunction(self.ctx, func, self._jrdd_deserializer, other._jrdd_deserializer)
295+
jfunc = TransformFunction(self.ctx, func, self._jrdd_deserializer, other._jrdd_deserializer)
296296
dstream = self.ctx._jvm.PythonTransformed2DStream(self._jdstream.dstream(),
297297
other._jdstream.dstream(), jfunc)
298298
jrdd_serializer = self._jrdd_deserializer if keepSerializer else self.ctx.serializer
@@ -535,9 +535,9 @@ def invReduceFunc(t, a, b):
535535
joined = a.leftOuterJoin(b, numPartitions)
536536
return joined.mapValues(lambda (v1, v2): invFunc(v1, v2) if v2 is not None else v1)
537537

538-
jreduceFunc = RDDFunction(self.ctx, reduceFunc, reduced._jrdd_deserializer)
538+
jreduceFunc = TransformFunction(self.ctx, reduceFunc, reduced._jrdd_deserializer)
539539
if invReduceFunc:
540-
jinvReduceFunc = RDDFunction(self.ctx, invReduceFunc, reduced._jrdd_deserializer)
540+
jinvReduceFunc = TransformFunction(self.ctx, invReduceFunc, reduced._jrdd_deserializer)
541541
else:
542542
jinvReduceFunc = None
543543
if slideDuration is None:
@@ -568,8 +568,8 @@ def reduceFunc(t, a, b):
568568
state = g.mapPartitions(lambda x: updateFunc(x))
569569
return state.filter(lambda (k, v): v is not None)
570570

571-
jreduceFunc = RDDFunction(self.ctx, reduceFunc,
572-
self.ctx.serializer, self._jrdd_deserializer)
571+
jreduceFunc = TransformFunction(self.ctx, reduceFunc,
572+
self.ctx.serializer, self._jrdd_deserializer)
573573
dstream = self.ctx._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc)
574574
return DStream(dstream.asJavaDStream(), self._ssc, self.ctx.serializer)
575575

@@ -609,7 +609,7 @@ def _jdstream(self):
609609
return self._jdstream_val
610610

611611
func = self.func
612-
jfunc = RDDFunction(self.ctx, func, self.prev._jrdd_deserializer)
612+
jfunc = TransformFunction(self.ctx, func, self.prev._jrdd_deserializer)
613613
jdstream = self.ctx._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(),
614614
jfunc, self.reuse).asJavaDStream()
615615
self._jdstream_val = jdstream

python/pyspark/streaming/util.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from pyspark import SparkContext, RDD
2222

2323

24-
class RDDFunction(object):
24+
class TransformFunction(object):
2525
"""
2626
This class is for py4j callback.
2727
"""
@@ -58,13 +58,13 @@ def call(self, milliseconds, jrdds):
5858
traceback.print_exc()
5959

6060
def __repr__(self):
61-
return "RDDFunction(%s)" % self.func
61+
return "TransformFunction(%s)" % self.func
6262

6363
class Java:
64-
implements = ['org.apache.spark.streaming.api.python.PythonRDDFunction']
64+
implements = ['org.apache.spark.streaming.api.python.PythonTransformFunction']
6565

6666

67-
class RDDFunctionSerializer(object):
67+
class TransformFunctionSerializer(object):
6868
def __init__(self, ctx, serializer, gateway=None):
6969
self.ctx = ctx
7070
self.serializer = serializer
@@ -80,15 +80,15 @@ def dumps(self, id):
8080
def loads(self, bytes):
8181
try:
8282
f, deserializers = self.serializer.loads(str(bytes))
83-
return RDDFunction(self.ctx, f, *deserializers)
83+
return TransformFunction(self.ctx, f, *deserializers)
8484
except Exception:
8585
traceback.print_exc()
8686

8787
def __repr__(self):
88-
return "RDDFunctionSerializer(%s)" % self.serializer
88+
return "TransformFunctionSerializer(%s)" % self.serializer
8989

9090
class Java:
91-
implements = ['org.apache.spark.streaming.api.python.PythonRDDFunctionSerializer']
91+
implements = ['org.apache.spark.streaming.api.python.PythonTransformFunctionSerializer']
9292

9393

9494
def rddToFileName(prefix, suffix, time):

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ import org.apache.spark.streaming.api.java._
3535
/**
3636
* Interface for Python callback function with three arguments
3737
*/
38-
private[python] trait PythonRDDFunction {
38+
private[python] trait PythonTransformFunction {
3939
def call(time: Long, rdds: JList[_]): JavaRDD[Array[Byte]]
4040
}
4141

4242
/**
43-
* Wrapper for PythonRDDFunction
43+
* Wrapper for PythonTransformFunction
4444
* TODO: support checkpoint
4545
*/
46-
private[python] class RDDFunction(@transient var pfunc: PythonRDDFunction)
46+
private[python] class TransformFunction(@transient var pfunc: PythonTransformFunction)
4747
extends function.Function2[JList[JavaRDD[_]], Time, JavaRDD[Array[Byte]]] with Serializable {
4848

4949
def apply(rdd: Option[RDD[_]], time: Time): Option[RDD[Array[Byte]]] = {
@@ -77,27 +77,27 @@ private[python] class RDDFunction(@transient var pfunc: PythonRDDFunction)
7777
}
7878

7979
/**
80-
* Interface for Python Serializer to serialize PythonRDDFunction
80+
* Interface for Python Serializer to serialize PythonTransformFunction
8181
*/
82-
private[python] trait PythonRDDFunctionSerializer {
82+
private[python] trait PythonTransformFunctionSerializer {
8383
def dumps(id: String): Array[Byte] //
84-
def loads(bytes: Array[Byte]): PythonRDDFunction
84+
def loads(bytes: Array[Byte]): PythonTransformFunction
8585
}
8686

8787
/**
88-
* Wrapper for PythonRDDFunctionSerializer
88+
* Wrapper for PythonTransformFunctionSerializer
8989
*/
90-
private[python] class RDDFunctionSerializer(pser: PythonRDDFunctionSerializer) {
91-
def serialize(func: PythonRDDFunction): Array[Byte] = {
92-
// get the id of PythonRDDFunction in py4j
90+
private[python] class TransformFunctionSerializer(pser: PythonTransformFunctionSerializer) {
91+
def serialize(func: PythonTransformFunction): Array[Byte] = {
92+
// get the id of PythonTransformFunction in py4j
9393
val h = Proxy.getInvocationHandler(func.asInstanceOf[Proxy])
9494
val f = h.getClass().getDeclaredField("id")
9595
f.setAccessible(true)
9696
val id = f.get(h).asInstanceOf[String]
9797
pser.dumps(id)
9898
}
9999

100-
def deserialize(bytes: Array[Byte]): PythonRDDFunction = {
100+
def deserialize(bytes: Array[Byte]): PythonTransformFunction = {
101101
pser.loads(bytes)
102102
}
103103
}
@@ -107,18 +107,18 @@ private[python] class RDDFunctionSerializer(pser: PythonRDDFunctionSerializer) {
107107
*/
108108
private[python] object PythonDStream {
109109

110-
// A serializer in Python, used to serialize PythonRDDFunction
111-
var serializer: RDDFunctionSerializer = _
110+
// A serializer in Python, used to serialize PythonTransformFunction
111+
var serializer: TransformFunctionSerializer = _
112112

113113
// Register a serializer from Python, should be called during initialization
114-
def registerSerializer(ser: PythonRDDFunctionSerializer) = {
115-
serializer = new RDDFunctionSerializer(ser)
114+
def registerSerializer(ser: PythonTransformFunctionSerializer) = {
115+
serializer = new TransformFunctionSerializer(ser)
116116
}
117117

118118
// helper function for DStream.foreachRDD(),
119119
// cannot be `foreachRDD`, it will confusing py4j
120-
def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pfunc: PythonRDDFunction) {
121-
val func = new RDDFunction((pfunc))
120+
def callForeachRDD(jdstream: JavaDStream[Array[Byte]], pfunc: PythonTransformFunction) {
121+
val func = new TransformFunction((pfunc))
122122
jdstream.dstream.foreachRDD((rdd, time) => func(Some(rdd), time))
123123
}
124124

@@ -134,10 +134,10 @@ private[python] object PythonDStream {
134134
* Base class for PythonDStream with some common methods
135135
*/
136136
private[python]
137-
abstract class PythonDStream(parent: DStream[_], @transient pfunc: PythonRDDFunction)
137+
abstract class PythonDStream(parent: DStream[_], @transient pfunc: PythonTransformFunction)
138138
extends DStream[Array[Byte]] (parent.ssc) {
139139

140-
val func = new RDDFunction(pfunc)
140+
val func = new TransformFunction(pfunc)
141141

142142
override def dependencies = List(parent)
143143

@@ -153,7 +153,7 @@ abstract class PythonDStream(parent: DStream[_], @transient pfunc: PythonRDDFunc
153153
* as an template for future use, this can reduce the Python callbacks.
154154
*/
155155
private[python]
156-
class PythonTransformedDStream (parent: DStream[_], @transient pfunc: PythonRDDFunction,
156+
class PythonTransformedDStream (parent: DStream[_], @transient pfunc: PythonTransformFunction,
157157
var reuse: Boolean = false)
158158
extends PythonDStream(parent, pfunc) {
159159

@@ -193,10 +193,10 @@ class PythonTransformedDStream (parent: DStream[_], @transient pfunc: PythonRDDF
193193
*/
194194
private[python]
195195
class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_],
196-
@transient pfunc: PythonRDDFunction)
196+
@transient pfunc: PythonTransformFunction)
197197
extends DStream[Array[Byte]] (parent.ssc) {
198198

199-
val func = new RDDFunction(pfunc)
199+
val func = new TransformFunction(pfunc)
200200

201201
override def slideDuration: Duration = parent.slideDuration
202202

@@ -213,7 +213,7 @@ class PythonTransformed2DStream(parent: DStream[_], parent2: DStream[_],
213213
* similar to StateDStream
214214
*/
215215
private[python]
216-
class PythonStateDStream(parent: DStream[Array[Byte]], @transient reduceFunc: PythonRDDFunction)
216+
class PythonStateDStream(parent: DStream[Array[Byte]], @transient reduceFunc: PythonTransformFunction)
217217
extends PythonDStream(parent, reduceFunc) {
218218

219219
super.persist(StorageLevel.MEMORY_ONLY)
@@ -235,16 +235,16 @@ class PythonStateDStream(parent: DStream[Array[Byte]], @transient reduceFunc: Py
235235
*/
236236
private[python]
237237
class PythonReducedWindowedDStream(parent: DStream[Array[Byte]],
238-
@transient preduceFunc: PythonRDDFunction,
239-
@transient pinvReduceFunc: PythonRDDFunction,
238+
@transient preduceFunc: PythonTransformFunction,
239+
@transient pinvReduceFunc: PythonTransformFunction,
240240
_windowDuration: Duration,
241241
_slideDuration: Duration
242242
) extends PythonDStream(parent, preduceFunc) {
243243

244244
super.persist(StorageLevel.MEMORY_ONLY)
245245
override val mustCheckpoint = true
246246

247-
val invReduceFunc = new RDDFunction(pinvReduceFunc)
247+
val invReduceFunc = new TransformFunction(pinvReduceFunc)
248248

249249
def windowDuration: Duration = _windowDuration
250250
override def slideDuration: Duration = _slideDuration

0 commit comments

Comments
 (0)