Skip to content

Commit e9fab72

Browse files
committed
added saveAsTextFiles and saveAsPickledFiles
1 parent 94f2b65 commit e9fab72

File tree

5 files changed

+78
-14
lines changed

5 files changed

+78
-14
lines changed

python/pyspark/streaming/context.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def textFileStream(self, directory):
114114
Create an input stream that monitors a Hadoop-compatible file system
115115
for new files and reads them as text files. Files must be wrriten to the
116116
monitored directory by "moving" them from another location within the same
117-
file system. FIle names starting with . are ignored.
117+
file system. File names starting with . are ignored.
118118
"""
119119
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
120120

@@ -132,8 +132,9 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
132132

133133
def _testInputStream(self, test_inputs, numSlices=None):
134134
"""
135-
This is inpired by QueStream implementation. Give list of RDD and generate DStream
136-
which contain the RDD.
135+
This function is only for test.
136+
This implementation is inpired by QueStream implementation.
137+
Give list of RDD to generate DStream which contains the RDD.
137138
"""
138139
test_rdds = list()
139140
test_rdd_deserializers = list()
@@ -142,12 +143,10 @@ def _testInputStream(self, test_inputs, numSlices=None):
142143
test_rdds.append(test_rdd._jrdd)
143144
test_rdd_deserializers.append(test_rdd._jrdd_deserializer)
144145

146+
# if len(set(test_rdd_deserializers)) > 1:
147+
# raise IOError("Deserializer should be one type to run test case. "
148+
# "See the SparkContext.parallelize to understand how to decide deserializer")
145149
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
146150
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()
147151

148-
dstream = DStream(jinput_stream, self, test_rdd_deserializers[0])
149-
return dstream
150-
151-
def _testInputStream3(self):
152-
jinput_stream = self._jvm.PythonTestInputStream3(self._jssc).asJavaDStream()
153-
return DStream(jinput_stream, self, UTF8Deserializer())
152+
return DStream(jinput_stream, self, test_rdd_deserializers[0])

python/pyspark/streaming/dstream.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
from pyspark.rdd import _JavaStackTrace
2525
from pyspark.storagelevel import StorageLevel
2626
from pyspark.resultiterable import ResultIterable
27+
from pyspark.streaming.utils import rddToFileName
28+
2729

2830
from py4j.java_collections import ListConverter, MapConverter
2931

@@ -343,21 +345,46 @@ def mergeCombiners(a, b):
343345
return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
344346
numPartitions).mapValues(lambda x: ResultIterable(x))
345347

348+
def countByValue(self):
349+
def countPartition(iterator):
350+
counts = defaultdict(int)
351+
for obj in iterator:
352+
counts[obj] += 1
353+
yield counts
354+
355+
def mergeMaps(m1, m2):
356+
for (k, v) in m2.iteritems():
357+
m1[k] += v
358+
return m1
359+
360+
return self.mapPartitions(countPartition).reduce(mergeMaps).flatMap(lambda x: x.items())
361+
362+
def saveAsTextFiles(self, prefix, suffix=None):
363+
364+
def saveAsTextFile(rdd, time):
365+
path = rddToFileName(prefix, suffix, time)
366+
rdd.saveAsTextFile(path)
367+
368+
return self.foreachRDD(saveAsTextFile)
369+
370+
def saveAsPickledFiles(self, prefix, suffix=None):
371+
372+
def saveAsTextFile(rdd, time):
373+
path = rddToFileName(prefix, suffix, time)
374+
rdd.saveAsPickleFile(path)
375+
376+
return self.foreachRDD(saveAsTextFile)
346377

347-
# TODO: implement groupByKey
348-
# TODO: implement saveAsTextFile
349378

350379
# Following operation has dependency to transform
351380
# TODO: impelment union
352381
# TODO: implement repertitions
353382
# TODO: implement cogroup
354383
# TODO: implement join
355-
# TODO: implement countByValue
356384
# TODO: implement leftOuterJoin
357385
# TODO: implemtnt rightOuterJoin
358386

359387

360-
361388
class PipelinedDStream(DStream):
362389
def __init__(self, prev, func, preservesPartitioning=False):
363390
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():

python/pyspark/streaming/utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,9 @@ def msDurationToString(ms):
5353
return "%.1f m" % (float(ms) / minute)
5454
else:
5555
return "%.2f h" % (float(ms) / hour)
56+
57+
def rddToFileName(prefix, suffix, time):
58+
if suffix is not None:
59+
return prefix + "-" + str(time) + "." + suffix
60+
else:
61+
return prefix + "-" + str(time)

python/pyspark/streaming_tests.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,38 @@ def f(iterator):
301301
output = self._run_stream(test_input, test_func, expected_output, numSlices)
302302
self.assertEqual(expected_output, output)
303303

304+
def test_countByValue_batch(self):
305+
"""Basic operation test for DStream.countByValue with batch deserializer"""
306+
test_input = [range(1, 5) + range(1,5), range(5, 7) + range(5, 9), ["a"] * 2 + ["b"] + [""] ]
307+
308+
def test_func(dstream):
309+
return dstream.countByValue()
310+
expected_output = [[(1, 2), (2, 2), (3, 2), (4, 2)],
311+
[(5, 2), (6, 2), (7, 1), (8, 1)],
312+
[("a", 2), ("b", 1), ("", 1)]]
313+
output = self._run_stream(test_input, test_func, expected_output)
314+
for result in (output, expected_output):
315+
self._sort_result_based_on_key(result)
316+
self.assertEqual(expected_output, output)
317+
318+
def test_countByValue_unbatch(self):
319+
"""Basic operation test for DStream.countByValue with unbatch deserializer"""
320+
test_input = [range(1, 4), [1, 1, ""], ["a", "a", "b"]]
321+
322+
def test_func(dstream):
323+
return dstream.countByValue()
324+
expected_output = [[(1, 1), (2, 1), (3, 1)],
325+
[(1, 2), ("", 1)],
326+
[("a", 2), ("b", 1)]]
327+
output = self._run_stream(test_input, test_func, expected_output)
328+
for result in (output, expected_output):
329+
self._sort_result_based_on_key(result)
330+
self.assertEqual(expected_output, output)
331+
332+
def _sort_result_based_on_key(self, outputs):
333+
for output in outputs:
334+
output.sort(key=lambda x: x[0])
335+
304336
def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
305337
"""Start stream and return the output"""
306338
# Generate input stream with user-defined input

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ class PythonTransformedDStream(
138138
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
139139
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
140140
* returns the i_th element at the i_th batch under manual clock.
141-
* This implementation is close to QueStream
141+
* This implementation is inspired by QueStream
142142
*/
143143

144144
class PythonTestInputStream(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])

0 commit comments

Comments
 (0)