Skip to content

Commit f67cf57

Browse files
committed
added mapValues and flatMapVaules WIP for glom and mapPartitions test
1 parent 953deb0 commit f67cf57

File tree

1 file changed

+11
-5
lines changed

1 file changed

+11
-5
lines changed

python/pyspark/streaming/dstream.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ def context(self):
4646
"""
4747
return self._ssc
4848

49+
def context(self):
50+
"""
51+
Return the StreamingContext associated with this DStream
52+
"""
53+
return self._ssc
54+
4955
def count(self):
5056
"""
5157
Return a new DStream which contains the number of elements in this DStream.
@@ -56,7 +62,7 @@ def _sum(self):
5662
"""
5763
Add up the elements in this DStream.
5864
"""
59-
return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
65+
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
6066

6167
def print_(self, label=None):
6268
"""
@@ -75,7 +81,7 @@ def filter(self, f):
7581
Return a new DStream containing only the elements that satisfy predicate.
7682
"""
7783
def func(iterator): return ifilter(f, iterator)
78-
return self._mapPartitions(func)
84+
return self.mapPartitions(func)
7985

8086
def flatMap(self, f, preservesPartitioning=False):
8187
"""
@@ -86,7 +92,7 @@ def func(s, iterator):
8692
return chain.from_iterable(imap(f, iterator))
8793
return self._mapPartitionsWithIndex(func, preservesPartitioning)
8894

89-
def map(self, f):
95+
def map(self, f, preservesPartitioning=False):
9096
"""
9197
Return a new DStream by applying a function to each element of DStream.
9298
"""
@@ -146,7 +152,7 @@ def combineLocally(iterator):
146152
else:
147153
combiners[k] = mergeValue(combiners[k], v)
148154
return combiners.iteritems()
149-
locally_combined = self._mapPartitions(combineLocally)
155+
locally_combined = self.mapPartitions(combineLocally)
150156
shuffled = locally_combined.partitionBy(numPartitions)
151157

152158
def _mergeCombiners(iterator):
@@ -474,4 +480,4 @@ def _jdstream(self):
474480
return self._jdstream_val
475481

476482
def _is_pipelinable(self):
477-
return not (self.is_cached)
483+
return not self.is_cached

0 commit comments

Comments
 (0)