Skip to content

Commit d3ee86a

Browse files
Ken Takagiwagiwa
Ken Takagiwa
authored andcommitted
added count operation but this implementation need double check
1 parent 15feea9 commit d3ee86a

File tree

1 file changed

+23
-2
lines changed

1 file changed

+23
-2
lines changed

python/pyspark/streaming/dstream.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
from collections import defaultdict
22
from itertools import chain, ifilter, imap
3+
import operator
4+
5+
import logging
36

47
from pyspark.serializers import NoOpSerializer,\
58
BatchedSerializer, CloudPickleSerializer, pack_long
@@ -24,6 +27,18 @@ def generatedRDDs(self):
2427
"""
2528
pass
2629

30+
def count(self):
31+
"""
32+
33+
"""
34+
#TODO make sure count implementation, thiis different from what pyspark does
35+
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum().map(lambda x: x[1])
36+
37+
def sum(self):
38+
"""
39+
"""
40+
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
41+
2742
def print_(self):
2843
"""
2944
"""
@@ -63,9 +78,9 @@ def reduce(self, func, numPartitions=None):
6378
"""
6479
6580
"""
66-
return self._combineByKey(lambda x:x, func, func, numPartitions)
81+
return self.combineByKey(lambda x:x, func, func, numPartitions)
6782

68-
def _combineByKey(self, createCombiner, mergeValue, mergeCombiners,
83+
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
6984
numPartitions = None):
7085
"""
7186
"""
@@ -74,6 +89,12 @@ def _combineByKey(self, createCombiner, mergeValue, mergeCombiners,
7489
def combineLocally(iterator):
7590
combiners = {}
7691
for x in iterator:
92+
93+
#TODO for count operation make sure count implementation
94+
# This is different from what pyspark does
95+
if isinstance(x, int):
96+
x = ("", x)
97+
7798
(k, v) = x
7899
if k not in combiners:
79100
combiners[k] = createCombiner(v)

0 commit comments

Comments
 (0)