Skip to content

Commit 74535d4

Browse files
committed
added saveAsTextFiles and saveAsPickledFiles
1 parent f76c182 commit 74535d4

File tree

4 files changed

+77
-13
lines changed

4 files changed

+77
-13
lines changed

python/pyspark/streaming/context.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def textFileStream(self, directory):
114114
Create an input stream that monitors a Hadoop-compatible file system
115115
for new files and reads them as text files. Files must be wrriten to the
116116
monitored directory by "moving" them from another location within the same
117-
file system. FIle names starting with . are ignored.
117+
file system. File names starting with . are ignored.
118118
"""
119119
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
120120

@@ -132,8 +132,9 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
132132

133133
def _testInputStream(self, test_inputs, numSlices=None):
134134
"""
135-
This is inpired by QueStream implementation. Give list of RDD and generate DStream
136-
which contain the RDD.
135+
This function is only for test.
136+
This implementation is inpired by QueStream implementation.
137+
Give list of RDD to generate DStream which contains the RDD.
137138
"""
138139
test_rdds = list()
139140
test_rdd_deserializers = list()
@@ -142,12 +143,10 @@ def _testInputStream(self, test_inputs, numSlices=None):
142143
test_rdds.append(test_rdd._jrdd)
143144
test_rdd_deserializers.append(test_rdd._jrdd_deserializer)
144145

146+
# if len(set(test_rdd_deserializers)) > 1:
147+
# raise IOError("Deserializer should be one type to run test case. "
148+
# "See the SparkContext.parallelize to understand how to decide deserializer")
145149
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
146150
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()
147151

148-
dstream = DStream(jinput_stream, self, test_rdd_deserializers[0])
149-
return dstream
150-
151-
def _testInputStream3(self):
152-
jinput_stream = self._jvm.PythonTestInputStream3(self._jssc).asJavaDStream()
153-
return DStream(jinput_stream, self, UTF8Deserializer())
152+
return DStream(jinput_stream, self, test_rdd_deserializers[0])

python/pyspark/streaming/dstream.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
from pyspark.rdd import _JavaStackTrace
2525
from pyspark.storagelevel import StorageLevel
2626
from pyspark.resultiterable import ResultIterable
27+
from pyspark.streaming.utils import rddToFileName
28+
2729

2830
from py4j.java_collections import ListConverter, MapConverter
2931

@@ -356,21 +358,46 @@ def mergeCombiners(a, b):
356358
return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
357359
numPartitions).mapValues(lambda x: ResultIterable(x))
358360

361+
def countByValue(self):
362+
def countPartition(iterator):
363+
counts = defaultdict(int)
364+
for obj in iterator:
365+
counts[obj] += 1
366+
yield counts
367+
368+
def mergeMaps(m1, m2):
369+
for (k, v) in m2.iteritems():
370+
m1[k] += v
371+
return m1
372+
373+
return self.mapPartitions(countPartition).reduce(mergeMaps).flatMap(lambda x: x.items())
374+
375+
def saveAsTextFiles(self, prefix, suffix=None):
376+
377+
def saveAsTextFile(rdd, time):
378+
path = rddToFileName(prefix, suffix, time)
379+
rdd.saveAsTextFile(path)
380+
381+
return self.foreachRDD(saveAsTextFile)
382+
383+
def saveAsPickledFiles(self, prefix, suffix=None):
384+
385+
def saveAsTextFile(rdd, time):
386+
path = rddToFileName(prefix, suffix, time)
387+
rdd.saveAsPickleFile(path)
388+
389+
return self.foreachRDD(saveAsTextFile)
359390

360-
# TODO: implement groupByKey
361-
# TODO: implement saveAsTextFile
362391

363392
# Following operation has dependency to transform
364393
# TODO: impelment union
365394
# TODO: implement repertitions
366395
# TODO: implement cogroup
367396
# TODO: implement join
368-
# TODO: implement countByValue
369397
# TODO: implement leftOuterJoin
370398
# TODO: implemtnt rightOuterJoin
371399

372400

373-
374401
class PipelinedDStream(DStream):
375402
def __init__(self, prev, func, preservesPartitioning=False):
376403
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():

python/pyspark/streaming/utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,9 @@ def msDurationToString(ms):
5353
return "%.1f m" % (float(ms) / minute)
5454
else:
5555
return "%.2f h" % (float(ms) / hour)
56+
57+
def rddToFileName(prefix, suffix, time):
58+
if suffix is not None:
59+
return prefix + "-" + str(time) + "." + suffix
60+
else:
61+
return prefix + "-" + str(time)

python/pyspark/streaming_tests.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,38 @@ def f(iterator):
301301
output = self._run_stream(test_input, test_func, expected_output, numSlices)
302302
self.assertEqual(expected_output, output)
303303

304+
def test_countByValue_batch(self):
305+
"""Basic operation test for DStream.countByValue with batch deserializer"""
306+
test_input = [range(1, 5) + range(1,5), range(5, 7) + range(5, 9), ["a"] * 2 + ["b"] + [""] ]
307+
308+
def test_func(dstream):
309+
return dstream.countByValue()
310+
expected_output = [[(1, 2), (2, 2), (3, 2), (4, 2)],
311+
[(5, 2), (6, 2), (7, 1), (8, 1)],
312+
[("a", 2), ("b", 1), ("", 1)]]
313+
output = self._run_stream(test_input, test_func, expected_output)
314+
for result in (output, expected_output):
315+
self._sort_result_based_on_key(result)
316+
self.assertEqual(expected_output, output)
317+
318+
def test_countByValue_unbatch(self):
319+
"""Basic operation test for DStream.countByValue with unbatch deserializer"""
320+
test_input = [range(1, 4), [1, 1, ""], ["a", "a", "b"]]
321+
322+
def test_func(dstream):
323+
return dstream.countByValue()
324+
expected_output = [[(1, 1), (2, 1), (3, 1)],
325+
[(1, 2), ("", 1)],
326+
[("a", 2), ("b", 1)]]
327+
output = self._run_stream(test_input, test_func, expected_output)
328+
for result in (output, expected_output):
329+
self._sort_result_based_on_key(result)
330+
self.assertEqual(expected_output, output)
331+
332+
def _sort_result_based_on_key(self, outputs):
333+
for output in outputs:
334+
output.sort(key=lambda x: x[0])
335+
304336
def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
305337
"""Start stream and return the output"""
306338
# Generate input stream with user-defined input

0 commit comments

Comments
 (0)