Skip to content

Commit b8d7d24

Browse files
committed
implemented reduce and count function in Dstream
1 parent b6468e6 commit b8d7d24

File tree

2 files changed

+19
-10
lines changed

2 files changed

+19
-10
lines changed

examples/src/main/python/streaming/network_wordcount.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,7 @@
1919
reduced_lines = mapped_lines.reduceByKey(add)
2020

2121
reduced_lines.pyprint()
22+
count_lines = mapped_lines.count()
23+
count_lines.pyprint()
2224
ssc.start()
2325
ssc.awaitTermination()

python/pyspark/streaming/dstream.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,23 @@ def count(self):
2222
"""
2323
2424
"""
25-
pass
26-
#TODO: make sure count implementation, thiis different from what pyspark does
27-
#return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))
25+
# TODO: make sure count implementation, this different from what pyspark does
26+
return self._mapPartitions(lambda i: [sum(1 for _ in i)])._sum()
2827

2928
def _sum(self):
3029
"""
3130
"""
32-
pass
33-
#return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
31+
return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
3432

3533
def print_(self):
3634
"""
37-
Since print is reserved name for python, we cannot make a print method function.
35+
Since print is reserved name for python, we cannot define a print method function.
3836
This function prints serialized data in RDD in DStream because Scala and Java cannot
39-
deserialized pickled python object. Please use DStream.pyprint() instead to print result.
37+
deserialized pickled python object. Please use DStream.pyprint() instead to print results.
4038
4139
Call DStream.print().
4240
"""
43-
#hack to call print function in DStream
41+
# a hack to call print function in DStream
4442
getattr(self._jdstream, "print")()
4543

4644
def filter(self, f):
@@ -79,17 +77,23 @@ def _mapPartitionsWithIndex(self, f, preservesPartitioning=False):
7977
"""
8078
return PipelinedDStream(self, f, preservesPartitioning)
8179

80+
def reduce(self, func):
81+
"""
82+
83+
"""
84+
return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda x: x[1])
85+
8286
def reduceByKey(self, func, numPartitions=None):
8387
"""
8488
Merge the value for each key using an associative reduce function.
8589
8690
This will also perform the merging locally on each mapper before
87-
sending resuls to reducer, similarly to a "combiner" in MapReduce.
91+
sending results to reducer, similarly to a "combiner" in MapReduce.
8892
8993
Output will be hash-partitioned with C{numPartitions} partitions, or
9094
the default parallelism level if C{numPartitions} is not specified.
9195
"""
92-
return self.combineByKey(lambda x:x, func, func, numPartitions)
96+
return self.combineByKey(lambda x: x, func, func, numPartitions)
9397

9498
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
9599
numPartitions = None):
@@ -99,6 +103,7 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
99103
"""
100104
if numPartitions is None:
101105
numPartitions = self._defaultReducePartitions()
106+
102107
def combineLocally(iterator):
103108
combiners = {}
104109
for x in iterator:
@@ -116,6 +121,7 @@ def combineLocally(iterator):
116121
return combiners.iteritems()
117122
locally_combined = self._mapPartitions(combineLocally)
118123
shuffled = locally_combined.partitionBy(numPartitions)
124+
119125
def _mergeCombiners(iterator):
120126
combiners = {}
121127
for (k, v) in iterator:
@@ -124,6 +130,7 @@ def _mergeCombiners(iterator):
124130
else:
125131
combiners[k] = mergeCombiners(combiners[k], v)
126132
return combiners.iteritems()
133+
127134
return shuffled._mapPartitions(_mergeCombiners)
128135

129136
def partitionBy(self, numPartitions, partitionFunc=None):

0 commit comments

Comments
 (0)