Skip to content

Commit 953deb0

Browse files
committed
edited the comment to add more precise description
1 parent af610d3 commit 953deb0

File tree

2 files changed

+0
-495
lines changed

2 files changed

+0
-495
lines changed

python/pyspark/streaming/context.py

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ def _testInputStream(self, test_inputs, numSlices=None):
146146
This implementation is inspired by QueStream implementation.
147147
Give list of RDD to generate DStream which contains the RDD.
148148
"""
149-
<<<<<<< HEAD
150149
test_rdds = list()
151150
test_rdd_deserializers = list()
152151
for test_input in test_inputs:
@@ -158,38 +157,3 @@ def _testInputStream(self, test_inputs, numSlices=None):
158157
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()
159158

160159
return DStream(jinput_stream, self, test_rdd_deserializers[0])
161-
=======
162-
self._jssc.checkpoint(directory)
163-
164-
def _testInputStream(self, test_inputs, numSlices=None):
165-
"""
166-
Generate multiple files to make "stream" in Scala side for test.
167-
Scala chooses one of the files and generates RDD using PythonRDD.readRDDFromFile.
168-
"""
169-
numSlices = numSlices or self._sc.defaultParallelism
170-
# Calling the Java parallelize() method with an ArrayList is too slow,
171-
# because it sends O(n) Py4J commands. As an alternative, serialized
172-
# objects are written to a file and loaded through textFile().
173-
174-
tempFiles = list()
175-
for test_input in test_inputs:
176-
tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
177-
178-
# Make sure we distribute data evenly if it's smaller than self.batchSize
179-
if "__len__" not in dir(test_input):
180-
c = list(test_input) # Make it a list so we can compute its length
181-
batchSize = min(len(test_input) // numSlices, self._sc._batchSize)
182-
if batchSize > 1:
183-
serializer = BatchedSerializer(self._sc._unbatched_serializer,
184-
batchSize)
185-
else:
186-
serializer = self._sc._unbatched_serializer
187-
serializer.dump_stream(test_input, tempFile)
188-
tempFiles.append(tempFile.name)
189-
190-
jtempFiles = ListConverter().convert(tempFiles, SparkContext._gateway._gateway_client)
191-
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,
192-
jtempFiles,
193-
numSlices).asJavaDStream()
194-
return DStream(jinput_stream, self, PickleSerializer())
195-
>>>>>>> added basic operation test cases

0 commit comments

Comments
 (0)