Skip to content

Commit 171edeb

Browse files
committed
clean up
1 parent 4dedd2d commit 171edeb

File tree

8 files changed

+46
-99
lines changed

8 files changed

+46
-99
lines changed

python/pyspark/streaming/context.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
#
1717

1818
import sys
19-
import time
2019
from signal import signal, SIGTERM, SIGINT
2120

2221
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
@@ -143,17 +142,18 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
143142

144143
def _testInputStream(self, test_inputs, numSlices=None):
145144
"""
146-
This function is only for test.
147-
This implementation is inspired by QueStream implementation.
148-
Give list of RDD to generate DStream which contains the RDD.
145+
This function is only for unittest.
146+
It requires a sequence as input, and returns the i_th element at the i_th batch
147+
under manual clock.
149148
"""
150149
test_rdds = list()
151150
test_rdd_deserializers = list()
152151
for test_input in test_inputs:
153152
test_rdd = self._sc.parallelize(test_input, numSlices)
154153
test_rdds.append(test_rdd._jrdd)
155154
test_rdd_deserializers.append(test_rdd._jrdd_deserializer)
156-
155+
# All deserializer has to be the same.
156+
# TODO: add deserializer validation
157157
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
158158
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()
159159

python/pyspark/streaming/dstream.py

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -276,23 +276,6 @@ def func(iterator):
276276
yield list(iterator)
277277
return self.mapPartitions(func)
278278

279-
#def transform(self, func): - TD
280-
# from utils import RDDFunction
281-
# wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
282-
# jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream
283-
# return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW
284-
285-
def _test_output(self, result):
286-
"""
287-
This function is only for test case.
288-
Store data in a DStream to result to verify the result in test case
289-
"""
290-
def get_output(rdd, time):
291-
taken = rdd.collect()
292-
result.append(taken)
293-
294-
self.foreachRDD(get_output)
295-
296279
def cache(self):
297280
"""
298281
Persist this DStream with the default storage level (C{MEMORY_ONLY_SER}).
@@ -398,6 +381,17 @@ def saveAsPickleFile(rdd, time):
398381

399382
return self.foreachRDD(saveAsPickleFile)
400383

384+
def _test_output(self, result):
385+
"""
386+
This function is only for test case.
387+
Store data in a DStream to result to verify the result in test case
388+
"""
389+
def get_output(rdd, time):
390+
collected = rdd.collect()
391+
result.append(collected)
392+
393+
self.foreachRDD(get_output)
394+
401395

402396
# TODO: implement updateStateByKey
403397
# TODO: implement slice

python/pyspark/streaming/jtime.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919
from pyspark.streaming.duration import Duration
2020

2121
"""
22-
The name of this file, time is not good naming for python
22+
The name of this file, time is not a good naming for python
2323
because if we do import time when we want to use native python time package, it does
2424
not import python time package.
2525
"""
26+
# TODO: add doctest
2627

2728

2829
class Time(object):

python/pyspark/streaming/utils.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020

2121
class RDDFunction():
22+
"""
23+
This class is for py4j callback. This
24+
"""
2225
def __init__(self, ctx, jrdd_deserializer, func):
2326
self.ctx = ctx
2427
self.deserializer = jrdd_deserializer
@@ -38,6 +41,7 @@ class Java:
3841

3942

4043
def msDurationToString(ms):
44+
#TODO: add doctest
4145
"""
4246
Returns a human-readable string representing a duration such as "35ms"
4347
"""
@@ -54,8 +58,10 @@ def msDurationToString(ms):
5458
else:
5559
return "%.2f h" % (float(ms) / hour)
5660

61+
5762
def rddToFileName(prefix, suffix, time):
58-
if suffix is not None:
59-
return prefix + "-" + str(time) + "." + suffix
60-
else:
63+
#TODO: add doctest
64+
if suffix is None:
6165
return prefix + "-" + str(time)
66+
else:
67+
return prefix + "-" + str(time) + "." + suffix

python/pyspark/streaming_tests.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
individual modules.
2121
2222
This file would be merged to tests.py after all functions are ready.
23-
But for now, this file is separated due to focusing to streaming test case.
23+
Since python API for streaming is beta, this file is separated.
2424
25-
Callback server seems like unstable sometimes, which cause error in test case.
25+
Callback server is sometimes unstable sometimes, which cause error in test case.
26+
But this is very rare case.
2627
2728
"""
2829
from itertools import chain
@@ -58,15 +59,14 @@ def tearDownClass(cls):
5859
class TestBasicOperationsSuite(PySparkStreamingTestCase):
5960
"""
6061
2 tests for each function for batach deserializer and unbatch deserilizer because
61-
we cannot change the deserializer after streaming process starts.
62+
the deserializer is not changed dunamically after streaming process starts.
6263
Default numInputPartitions is 2.
6364
If the number of input element is over 3, that DStream use batach deserializer.
6465
If not, that DStream use unbatch deserializer.
6566
66-
Most of the operation uses UTF8 deserializer to get value from Scala.
67-
I am wondering if these test are enough or not.
68-
All tests input should have list of lists. This represents stream.
67+
All tests input should have list of lists. This list represents stream.
6968
Every batch interval, the first object of list are chosen to make DStream.
69+
e.g The first list in the list is input of the first batch.
7070
Please see the BasicTestSuits in Scala which is close to this implementation.
7171
"""
7272
def setUp(self):
@@ -412,7 +412,7 @@ def _sort_result_based_on_key(self, outputs):
412412

413413
def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
414414
"""
415-
Start stream and return the output.
415+
Start stream and return the result.
416416
@param test_input: dataset for the test. This should be list of lists.
417417
@param test_func: wrapped test_function. This function should return PythonDstream object.
418418
@param expexted_output: expected output for this testcase.
@@ -444,6 +444,7 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
444444

445445
return result
446446

447+
447448
class TestSaveAsFilesSuite(PySparkStreamingTestCase):
448449
def setUp(self):
449450
PySparkStreamingTestCase.setUp(self)

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818
package org.apache.spark.streaming.api.python
1919

2020
import java.io._
21-
import java.io.{ObjectInputStream, IOException}
22-
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
21+
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap}
2322

24-
import scala.collection.mutable.ArrayBuffer
2523
import scala.reflect.ClassTag
2624
import scala.collection.JavaConversions._
2725

@@ -56,7 +54,9 @@ class PythonDStream[T: ClassTag](
5654
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
5755
parent.getOrCompute(validTime) match{
5856
case Some(rdd) =>
59-
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator)
57+
// create PythonRDD to compute Python functions.
58+
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes,
59+
preservePartitoning, pythonExec, broadcastVars, accumulator)
6060
Some(pythonRDD.asJavaRDD.rdd)
6161
case None => None
6262
}
@@ -81,8 +81,8 @@ DStream[Array[Byte]](prev.ssc){
8181
case Some(rdd)=>Some(rdd)
8282
val pairwiseRDD = new PairwiseRDD(rdd)
8383
/*
84-
* Since python operation is executed by Scala after StreamingContext.start.
85-
* What PythonPairwiseDStream does is equivalent to python code in pySpark.
84+
* Since python function is executed by Scala after StreamingContext.start.
85+
* What PythonPairwiseDStream does is equivalent to python code in pyspark.
8686
*
8787
* with _JavaStackTrace(self.context) as st:
8888
* pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
@@ -99,6 +99,7 @@ DStream[Array[Byte]](prev.ssc){
9999
val asJavaDStream = JavaDStream.fromDStream(this)
100100
}
101101

102+
102103
class PythonForeachDStream(
103104
prev: DStream[Array[Byte]],
104105
foreachFunction: PythonRDDFunction
@@ -112,29 +113,11 @@ class PythonForeachDStream(
112113
this.register()
113114
}
114115

115-
class PythonTransformedDStream(
116-
prev: DStream[Array[Byte]],
117-
transformFunction: PythonRDDFunction
118-
) extends DStream[Array[Byte]](prev.ssc) {
119-
120-
override def dependencies = List(prev)
121-
122-
override def slideDuration: Duration = prev.slideDuration
123-
124-
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
125-
prev.getOrCompute(validTime).map(rdd => {
126-
transformFunction.call(rdd.toJavaRDD(), validTime.milliseconds).rdd
127-
})
128-
}
129-
130-
val asJavaDStream = JavaDStream.fromDStream(this)
131-
}
132116

133117
/**
134118
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
135119
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
136120
* returns the i_th element at the i_th batch under manual clock.
137-
* This implementation is inspired by QueStream
138121
*/
139122

140123
class PythonTestInputStream(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonRDDFunction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
import org.apache.spark.api.java.JavaRDD;
44
import org.apache.spark.streaming.Time;
55

6+
/*
7+
* Interface for py4j callback function.
8+
* This function is called by pyspark.streaming.dstream.DStream.foreachRDD .
9+
*/
610
public interface PythonRDDFunction {
711
JavaRDD<byte[]> call(JavaRDD<byte[]> rdd, long time);
812
}

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala

Lines changed: 0 additions & 42 deletions
This file was deleted.

0 commit comments

Comments
 (0)