Skip to content

Commit ce2acd2

Browse files
committed
WIP added test case
1 parent 9ad6855 commit ce2acd2

File tree

8 files changed

+132
-30
lines changed

8 files changed

+132
-30
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

-2
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,6 @@ private[spark] object PythonRDD extends Logging {
350350
} catch {
351351
case eof: EOFException => {}
352352
}
353-
println("RDDDD ==================")
354-
println(objs)
355353
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
356354
}
357355

examples/src/main/python/streaming/test_oprations.py

+16-9
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,22 @@
99
conf = SparkConf()
1010
conf.setAppName("PythonStreamingNetworkWordCount")
1111
ssc = StreamingContext(conf=conf, duration=Seconds(1))
12-
ssc.checkpoint("/tmp/spark_ckp")
1312

14-
test_input = ssc._testInputStream([[1],[1],[1]])
15-
# ssc.checkpoint("/tmp/spark_ckp")
16-
fm_test = test_input.flatMap(lambda x: x.split(" "))
17-
mapped_test = fm_test.map(lambda x: (x, 1))
13+
test_input = ssc._testInputStream([1,2,3])
14+
class buff:
15+
pass
16+
17+
fm_test = test_input.map(lambda x: (x, 1))
18+
fm_test.test_output(buff)
1819

19-
20-
mapped_test.print_()
2120
ssc.start()
22-
# ssc.awaitTermination()
23-
# ssc.stop()
21+
while True:
22+
ssc.awaitTermination(50)
23+
try:
24+
buff.result
25+
break
26+
except AttributeError:
27+
pass
28+
29+
ssc.stop()
30+
print buff.result

python/pyspark/streaming/context.py

+7-9
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,10 @@ def awaitTermination(self, timeout=None):
100100
"""
101101
Wait for the execution to stop.
102102
"""
103-
if timeout:
104-
self._jssc.awaitTermination(timeout)
105-
else:
103+
if timeout is None:
106104
self._jssc.awaitTermination()
105+
else:
106+
self._jssc.awaitTermination(timeout)
107107

108108
# start from simple one. storageLevel is not passed for now.
109109
def socketTextStream(self, hostname, port):
@@ -137,6 +137,7 @@ def stop(self, stopSparkContext=True):
137137

138138
def checkpoint(self, directory):
139139
"""
140+
Not tested
140141
"""
141142
self._jssc.checkpoint(directory)
142143

@@ -147,8 +148,7 @@ def _testInputStream(self, test_input, numSlices=None):
147148
# because it sends O(n) Py4J commands. As an alternative, serialized
148149
# objects are written to a file and loaded through textFile().
149150

150-
#tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
151-
tempFile = open("/tmp/spark_rdd", "wb")
151+
tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
152152

153153
# Make sure we distribute data evenly if it's smaller than self.batchSize
154154
if "__len__" not in dir(test_input):
@@ -160,10 +160,8 @@ def _testInputStream(self, test_input, numSlices=None):
160160
else:
161161
serializer = self._sc._unbatched_serializer
162162
serializer.dump_stream(test_input, tempFile)
163-
tempFile.flush()
164-
tempFile.close()
165-
print tempFile.name
163+
166164
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,
167165
tempFile.name,
168166
numSlices).asJavaDStream()
169-
return DStream(jinput_stream, self, UTF8Deserializer())
167+
return DStream(jinput_stream, self, PickleSerializer())

python/pyspark/streaming/dstream.py

+18-4
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def _sum(self):
4747
"""
4848
return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
4949

50-
def print_(self):
50+
def print_(self, label=None):
5151
"""
5252
Since print is reserved name for python, we cannot define a print method function.
5353
This function prints serialized data in RDD in DStream because Scala and Java cannot
@@ -56,7 +56,7 @@ def print_(self):
5656
Call DStream.print().
5757
"""
5858
# a hack to call print function in DStream
59-
getattr(self._jdstream, "print")()
59+
getattr(self._jdstream, "print")(label)
6060

6161
def filter(self, f):
6262
"""
@@ -230,6 +230,7 @@ def pyprint(self):
230230
231231
"""
232232
def takeAndPrint(rdd, time):
233+
print "take and print ==================="
233234
taken = rdd.take(11)
234235
print "-------------------------------------------"
235236
print "Time: %s" % (str(time))
@@ -242,11 +243,24 @@ def takeAndPrint(rdd, time):
242243

243244
self.foreachRDD(takeAndPrint)
244245

245-
#def transform(self, func):
246+
#def transform(self, func): - TD
246247
# from utils import RDDFunction
247248
# wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
248249
# jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream
249-
# return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW
250+
# return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW
251+
252+
def _test_output(self, buff):
253+
"""
254+
This function is only for testcase.
255+
Store data in dstream to buffer to valify the result in tesecase
256+
"""
257+
def get_output(rdd, time):
258+
taken = rdd.take(11)
259+
buff.result = taken
260+
self.foreachRDD(get_output)
261+
262+
def output(self):
263+
self._jdstream.outputToFile()
250264

251265

252266
class PipelinedDStream(DStream):

python/pyspark/streaming_tests.py

+57-5
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919
Unit tests for PySpark; additional tests are implemented as doctests in
2020
individual modules.
2121
22-
This file will merged to tests.py. But for now, this file is separated to
23-
focus to streaming test case
22+
This file will merged to tests.py. But for now, this file is separated due
23+
to focusing to streaming test case
2424
2525
"""
2626
from fileinput import input
2727
from glob import glob
28+
from itertools import chain
2829
import os
2930
import re
3031
import shutil
@@ -41,18 +42,69 @@
4142

4243
SPARK_HOME = os.environ["SPARK_HOME"]
4344

45+
class buff:
46+
"""
47+
Buffer for store the output from stream
48+
"""
49+
result = None
4450

4551
class PySparkStreamingTestCase(unittest.TestCase):
46-
4752
def setUp(self):
48-
self._old_sys_path = list(sys.path)
53+
print "set up"
4954
class_name = self.__class__.__name__
5055
self.ssc = StreamingContext(appName=class_name, duration=Seconds(1))
5156

5257
def tearDown(self):
58+
print "tear donw"
5359
self.ssc.stop()
54-
sys.path = self._old_sys_path
60+
time.sleep(10)
61+
62+
class TestBasicOperationsSuite(PySparkStreamingTestCase):
63+
def setUp(self):
64+
PySparkStreamingTestCase.setUp(self)
65+
buff.result = None
66+
self.timeout = 10 # seconds
67+
68+
def tearDown(self):
69+
PySparkStreamingTestCase.tearDown(self)
70+
71+
def test_map(self):
72+
test_input = [range(1,5), range(5,9), range(9, 13)]
73+
def test_func(dstream):
74+
return dstream.map(lambda x: str(x))
75+
expected = map(str, test_input)
76+
output = self.run_stream(test_input, test_func)
77+
self.assertEqual(output, expected)
78+
79+
def test_flatMap(self):
80+
test_input = [range(1,5), range(5,9), range(9, 13)]
81+
def test_func(dstream):
82+
return dstream.flatMap(lambda x: (x, x * 2))
83+
# Maybe there be good way to create flatmap
84+
excepted = map(lambda x: list(chain.from_iterable((map(lambda y:[y, y*2], x)))),
85+
test_input)
86+
output = self.run_stream(test_input, test_func)
87+
88+
def run_stream(self, test_input, test_func):
89+
# Generate input stream with user-defined input
90+
test_input_stream = self.ssc._testInputStream(test_input)
91+
# Applyed test function to stream
92+
test_stream = test_func(test_input_stream)
93+
# Add job to get outpuf from stream
94+
test_stream._test_output(buff)
95+
self.ssc.start()
5596

97+
start_time = time.time()
98+
while True:
99+
current_time = time.time()
100+
# check time out
101+
if (current_time - start_time) > self.timeout:
102+
self.ssc.stop()
103+
break
104+
self.ssc.awaitTermination(50)
105+
if buff.result is not None:
106+
break
107+
return buff.result
56108

57109
if __name__ == "__main__":
58110
unittest.main()

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala

+9
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
5454
dstream.print()
5555
}
5656

57+
def print(label: String = null): Unit = {
58+
dstream.print(label)
59+
}
60+
61+
def outputToFile(): Unit = {
62+
dstream.outputToFile()
63+
}
64+
65+
5766
/**
5867
* Return a new DStream in which each RDD has a single element generated by counting each RDD
5968
* of this DStream.

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

+8-1
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@
1717

1818
package org.apache.spark.streaming.api.python
1919

20+
import java.io._
21+
import java.io.{ObjectInputStream, IOException}
2022
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
2123

24+
import scala.collection.mutable.ArrayBuffer
2225
import scala.reflect.ClassTag
26+
import scala.collection.JavaConversions._
27+
2328

2429
import org.apache.spark._
2530
import org.apache.spark.rdd.RDD
@@ -51,6 +56,8 @@ class PythonDStream[T: ClassTag](
5156
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
5257
parent.getOrCompute(validTime) match{
5358
case Some(rdd) =>
59+
logInfo("RDD ID in python DStream ===========")
60+
logInfo("RDD id " + rdd.id)
5461
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator)
5562
Some(pythonRDD.asJavaRDD.rdd)
5663
case None => None
@@ -152,7 +159,7 @@ DStream[Array[Byte]](prev.ssc){
152159
val pairwiseRDD = new PairwiseRDD(rdd)
153160
/*
154161
* Since python operation is executed by Scala after StreamingContext.start.
155-
* What PairwiseDStream does is equivalent to following python code in pySpark.
162+
* What PythonPairwiseDStream does is equivalent to python code in pySpark.
156163
*
157164
* with _JavaStackTrace(self.context) as st:
158165
* pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

+17
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,23 @@ abstract class DStream[T: ClassTag] (
623623
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
624624
}
625625

626+
627+
def print(label: String = null) {
628+
def foreachFunc = (rdd: RDD[T], time: Time) => {
629+
val first11 = rdd.take(11)
630+
println ("-------------------------------------------")
631+
println ("Time: " + time)
632+
println ("-------------------------------------------")
633+
if(label != null){
634+
println (label)
635+
}
636+
first11.take(10).foreach(println)
637+
if (first11.size > 10) println("...")
638+
println()
639+
}
640+
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
641+
}
642+
626643
/**
627644
* Return a new DStream in which each RDD contains all the elements in seen in a
628645
* sliding window of time over this DStream. The new DStream generates RDDs with

0 commit comments

Comments
 (0)