Skip to content

Commit 8dcda84

Browse files
committed
all tests are passed if numSlice is 2 and the numver of each input is over 4
1 parent 795b2cd commit 8dcda84

File tree

2 files changed

+18
-47
lines changed

2 files changed

+18
-47
lines changed

python/pyspark/streaming/context.py

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -142,49 +142,9 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
142142

143143
def _testInputStream(self, test_inputs, numSlices=None):
144144
"""
145-
<<<<<<< HEAD
146145
This function is only for test.
147146
This implementation is inspired by QueStream implementation.
148147
Give list of RDD to generate DStream which contains the RDD.
149-
=======
150-
Generate multiple files to make "stream" in Scala side for test.
151-
Scala chooses one of the files and generates RDD using PythonRDD.readRDDFromFile.
152-
153-
QueStream maybe good way to implement this function
154-
"""
155-
numSlices = numSlices or self._sc.defaultParallelism
156-
# Calling the Java parallelize() method with an ArrayList is too slow,
157-
# because it sends O(n) Py4J commands. As an alternative, serialized
158-
# objects are written to a file and loaded through textFile().
159-
160-
tempFiles = list()
161-
for test_input in test_inputs:
162-
tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
163-
164-
# Make sure we distribute data evenly if it's smaller than self.batchSize
165-
if "__len__" not in dir(test_input):
166-
test_input = list(test_input) # Make it a list so we can compute its length
167-
batchSize = min(len(test_input) // numSlices, self._sc._batchSize)
168-
if batchSize > 1:
169-
serializer = BatchedSerializer(self._sc._unbatched_serializer,
170-
batchSize)
171-
else:
172-
serializer = self._sc._unbatched_serializer
173-
serializer.dump_stream(test_input, tempFile)
174-
tempFile.close()
175-
tempFiles.append(tempFile.name)
176-
177-
jtempFiles = ListConverter().convert(tempFiles, SparkContext._gateway._gateway_client)
178-
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,
179-
jtempFiles,
180-
numSlices).asJavaDStream()
181-
return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer()))
182-
183-
def _testInputStream2(self, test_inputs, numSlices=None):
184-
"""
185-
This is inpired by QueStream implementation. Give list of RDD and generate DStream
186-
which contain the RDD.
187-
>>>>>>> broke something
188148
"""
189149
test_rdds = list()
190150
test_rdd_deserializers = list()
@@ -196,10 +156,4 @@ def _testInputStream2(self, test_inputs, numSlices=None):
196156
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
197157
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()
198158

199-
<<<<<<< HEAD
200159
return DStream(jinput_stream, self, test_rdd_deserializers[0])
201-
=======
202-
dstream = DStream(jinput_stream, self, test_rdd_deserializers[0])
203-
dstream._test_switch_dserializer(test_rdd_deserializers)
204-
return dstream
205-
>>>>>>> broke something

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,4 +280,21 @@ class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[
280280

281281
val asJavaDStream = JavaDStream.fromDStream(this)
282282
}
283-
>>>>>>> broke something
283+
284+
285+
class PythonTestInputStream3(ssc_ : JavaStreamingContext)
286+
extends InputDStream[Any](JavaStreamingContext.toStreamingContext(ssc_)) {
287+
288+
def start() {}
289+
290+
def stop() {}
291+
292+
def compute(validTime: Time): Option[RDD[Any]] = {
293+
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
294+
val selectedInput = ArrayBuffer(1, 2, 3).toSeq
295+
val rdd :RDD[Any] = ssc.sc.makeRDD(selectedInput, 2)
296+
Some(rdd)
297+
}
298+
299+
val asJavaDStream = JavaDStream.fromDStream(this)
300+
}>>>>>>> broke something

0 commit comments

Comments
 (0)