Skip to content

Commit b3b0362

Browse files
committed
added basic operation test cases
1 parent 9cde7c9 commit b3b0362

File tree

5 files changed

+48
-15
lines changed

5 files changed

+48
-15
lines changed

examples/src/main/python/streaming/test_oprations.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,23 @@
99
conf = SparkConf()
1010
conf.setAppName("PythonStreamingNetworkWordCount")
1111
ssc = StreamingContext(conf=conf, duration=Seconds(1))
12-
13-
test_input = ssc._testInputStream([1,2,3])
14-
class buff:
12+
class Buff:
13+
result = list()
1514
pass
15+
Buff.result = list()
16+
17+
test_input = ssc._testInputStream([range(1,4), range(4,7), range(7,10)])
1618

1719
fm_test = test_input.map(lambda x: (x, 1))
18-
fm_test.test_output(buff)
20+
fm_test.pyprint()
21+
fm_test._test_output(Buff.result)
1922

2023
ssc.start()
2124
while True:
2225
ssc.awaitTermination(50)
23-
try:
24-
buff.result
26+
if len(Buff.result) == 3:
2527
break
26-
except AttributeError:
27-
pass
2828

2929
ssc.stop()
30-
print buff.result
30+
print Buff.result
31+

python/pyspark/streaming/context.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ def _testInputStream(self, test_inputs, numSlices=None):
150150
This implementation is inspired by QueStream implementation.
151151
Give list of RDD to generate DStream which contains the RDD.
152152
"""
153+
<<<<<<< HEAD
153154
test_rdds = list()
154155
test_rdd_deserializers = list()
155156
for test_input in test_inputs:
@@ -161,3 +162,38 @@ def _testInputStream(self, test_inputs, numSlices=None):
161162
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()
162163

163164
return DStream(jinput_stream, self, test_rdd_deserializers[0])
165+
=======
166+
self._jssc.checkpoint(directory)
167+
168+
def _testInputStream(self, test_inputs, numSlices=None):
169+
"""
170+
Generate multiple files to make "stream" in Scala side for test.
171+
Scala chooses one of the files and generates RDD using PythonRDD.readRDDFromFile.
172+
"""
173+
numSlices = numSlices or self._sc.defaultParallelism
174+
# Calling the Java parallelize() method with an ArrayList is too slow,
175+
# because it sends O(n) Py4J commands. As an alternative, serialized
176+
# objects are written to a file and loaded through textFile().
177+
178+
tempFiles = list()
179+
for test_input in test_inputs:
180+
tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
181+
182+
# Make sure we distribute data evenly if it's smaller than self.batchSize
183+
if "__len__" not in dir(test_input):
184+
c = list(test_input) # Make it a list so we can compute its length
185+
batchSize = min(len(test_input) // numSlices, self._sc._batchSize)
186+
if batchSize > 1:
187+
serializer = BatchedSerializer(self._sc._unbatched_serializer,
188+
batchSize)
189+
else:
190+
serializer = self._sc._unbatched_serializer
191+
serializer.dump_stream(test_input, tempFile)
192+
tempFiles.append(tempFile.name)
193+
194+
jtempFiles = ListConverter().convert(tempFiles, SparkContext._gateway._gateway_client)
195+
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,
196+
jtempFiles,
197+
numSlices).asJavaDStream()
198+
return DStream(jinput_stream, self, PickleSerializer())
199+
>>>>>>> added basic operation test cases

python/pyspark/streaming/dstream.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,6 @@ def pyprint(self):
236236
operator, so this DStream will be registered as an output stream and there materialized.
237237
"""
238238
def takeAndPrint(rdd, time):
239-
print "take and print ==================="
240239
taken = rdd.take(11)
241240
print "-------------------------------------------"
242241
print "Time: %s" % (str(time))

python/pyspark/streaming_tests.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -449,12 +449,11 @@ def tearDownClass(cls):
449449
current_time = time.time()
450450
# check time out
451451
if (current_time - start_time) > self.timeout:
452-
self.ssc.stop()
453452
break
454453
self.ssc.awaitTermination(50)
455-
if buff.result is not None:
454+
if len(expected_output) == len(StreamOutput.result):
456455
break
457-
return buff.result
456+
return StreamOutput.result
458457

459458
if __name__ == "__main__":
460459
unittest.main()

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ class PythonDStream[T: ClassTag](
5555
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
5656
parent.getOrCompute(validTime) match{
5757
case Some(rdd) =>
58-
logInfo("RDD ID in python DStream ===========")
59-
logInfo("RDD id " + rdd.id)
6058
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator)
6159
Some(pythonRDD.asJavaRDD.rdd)
6260
case None => None

0 commit comments

Comments
 (0)