Skip to content

Commit 72b9738

Browse files
Ken Takagiwagiwa
Ken Takagiwa
authored andcommitted
fix map function
1 parent d3ee86a commit 72b9738

File tree

2 files changed

+15
-13
lines changed

2 files changed

+15
-13
lines changed

examples/src/main/python/streaming/network_wordcount.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
2020
mapped_lines = fm_lines.map(lambda x: (x, 1))
2121
reduced_lines = mapped_lines.reduce(add)
22+
counted_lines = reduced_lines.count()
2223

2324
fm_lines.pyprint()
2425
filtered_lines.pyprint()
2526
mapped_lines.pyprint()
2627
reduced_lines.pyprint()
28+
counted_lines.pyprint()
2729
ssc.start()
2830
ssc.awaitTermination()

python/pyspark/streaming/dstream.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,14 @@ def __init__(self, jdstream, ssc, jrdd_deserializer):
2020
self.ctx = ssc._sc
2121
self._jrdd_deserializer = jrdd_deserializer
2222

23-
def generatedRDDs(self):
24-
"""
25-
// RDDs generated, marked as private[streaming] so that testsuites can access it
26-
@transient
27-
"""
28-
pass
29-
3023
def count(self):
3124
"""
3225
3326
"""
3427
#TODO make sure count implementation, thiis different from what pyspark does
35-
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum().map(lambda x: x[1])
28+
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))
3629

37-
def sum(self):
30+
def _sum(self):
3831
"""
3932
"""
4033
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
@@ -65,15 +58,22 @@ def func(s, iterator): return chain.from_iterable(imap(f, iterator))
6558
def map(self, f, preservesPartitioning=False):
6659
"""
6760
"""
68-
def func(split, iterator): return imap(f, iterator)
69-
return PipelinedDStream(self, func, preservesPartitioning)
61+
def func(iterator): return imap(f, iterator)
62+
return self.mapPartitions(func)
63+
#return PipelinedDStream(self, func, preservesPartitioning)
7064

7165
def mapPartitions(self, f):
7266
"""
7367
"""
7468
def func(s, iterator): return f(iterator)
7569
return self.mapPartitionsWithIndex(func)
7670

71+
def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
72+
"""
73+
74+
"""
75+
return PipelinedDStream(self, f, preservesPartitioning)
76+
7777
def reduce(self, func, numPartitions=None):
7878
"""
7979
@@ -92,8 +92,8 @@ def combineLocally(iterator):
9292

9393
#TODO for count operation make sure count implementation
9494
# This is different from what pyspark does
95-
if isinstance(x, int):
96-
x = ("", x)
95+
#if isinstance(x, int):
96+
# x = ("", x)
9797

9898
(k, v) = x
9999
if k not in combiners:

0 commit comments

Comments
 (0)