Skip to content

Commit 1f68b78

Browse files
committed
WIP
1 parent c05922c commit 1f68b78

File tree

3 files changed

+14
-4
lines changed

3 files changed

+14
-4
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,8 @@ private[spark] object PythonRDD extends Logging {
312312
} catch {
313313
case eof: EOFException => {}
314314
}
315+
println("RDDDD ==================")
316+
println(objs)
315317
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
316318
}
317319

examples/src/main/python/streaming/test_oprations.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@
99
conf = SparkConf()
1010
conf.setAppName("PythonStreamingNetworkWordCount")
1111
ssc = StreamingContext(conf=conf, duration=Seconds(1))
12+
ssc.checkpoint("/tmp/spark_ckp")
1213

13-
test_input = ssc._testInputStream([1,1,1,1])
14-
mapped = test_input.map(lambda x: (x, 1))
15-
mapped.pyprint()
14+
test_input = ssc._testInputStream([[1],[1],[1]])
15+
# ssc.checkpoint("/tmp/spark_ckp")
16+
fm_test = test_input.flatMap(lambda x: x.split(" "))
17+
mapped_test = fm_test.map(lambda x: (x, 1))
1618

19+
20+
mapped_test.print_()
1721
ssc.start()
1822
# ssc.awaitTermination()
1923
# ssc.stop()

python/pyspark/streaming/context.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,10 @@ def _testInputStream(self, test_input, numSlices=None):
146146
# Calling the Java parallelize() method with an ArrayList is too slow,
147147
# because it sends O(n) Py4J commands. As an alternative, serialized
148148
# objects are written to a file and loaded through textFile().
149-
tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
149+
150+
#tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
151+
tempFile = open("/tmp/spark_rdd", "wb")
152+
150153
# Make sure we distribute data evenly if it's smaller than self.batchSize
151154
if "__len__" not in dir(test_input):
152155
c = list(test_input) # Make it a list so we can compute its length
@@ -157,6 +160,7 @@ def _testInputStream(self, test_input, numSlices=None):
157160
else:
158161
serializer = self._sc._unbatched_serializer
159162
serializer.dump_stream(test_input, tempFile)
163+
tempFile.flush()
160164
tempFile.close()
161165
print tempFile.name
162166
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,

0 commit comments

Comments
 (0)