Skip to content

Commit d7b4d6f

Browse files
Ken Takagiwagiwa
Ken Takagiwa
authored andcommitted
added reducedByKey not working yet
1 parent 87438e2 commit d7b4d6f

File tree

3 files changed

+37
-6
lines changed

3 files changed

+37
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,30 @@
11
import sys
22
from operator import add
33

4+
from pyspark.conf import SparkConf
45
from pyspark.streaming.context import StreamingContext
56
from pyspark.streaming.duration import *
67

78
if __name__ == "__main__":
89
if len(sys.argv) != 2:
910
print >> sys.stderr, "Usage: wordcount <directory>"
1011
exit(-1)
11-
ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
12+
conf = SparkConf()
13+
conf.setAppName("PythonStreamingWordCount")
14+
conf.set("spark.default.parallelism", 1)
15+
16+
# ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
17+
ssc = StreamingContext(conf=conf, duration=Seconds(1))
1218

1319
lines = ssc.textFileStream(sys.argv[1])
1420
fm_lines = lines.flatMap(lambda x: x.split(" "))
1521
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
1622
mapped_lines = fm_lines.map(lambda x: (x, 1))
23+
reduced_lines = mapped_lines.reduce(add)
1724

1825
fm_lines.pyprint()
1926
filtered_lines.pyprint()
2027
mapped_lines.pyprint()
28+
reduced_lines.pyprint()
2129
ssc.start()
2230
ssc.awaitTermination()

python/pyspark/streaming/dstream.py

+25-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
__all__ = ["DStream"]
3131

32+
3233
class DStream(object):
3334
def __init__(self, jdstream, ssc, jrdd_deserializer):
3435
self._jdstream = jdstream
@@ -149,7 +150,7 @@ def _combineByKey(self, createCombiner, mergeValue, mergeCombiners,
149150
"""
150151
"""
151152
if numPartitions is None:
152-
numPartitions = self.ctx._defaultParallelism()
153+
numPartitions = self._defaultReducePartitions()
153154
def combineLocally(iterator):
154155
combiners = {}
155156
for x in iterator:
@@ -211,7 +212,6 @@ def add_shuffle_key(split, iterator):
211212
return dstream
212213

213214

214-
215215
def reduceByWindow(self, reduceFunc, windowDuration, slideDuration, inReduceTunc):
216216
"""
217217
"""
@@ -254,8 +254,31 @@ def wrapRDD(self, rdd):
254254
raise NotImplementedError
255255

256256
def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
257+
"""
258+
259+
"""
257260
return PipelinedDStream(self, f, preservesPartitioning)
258261

262+
def _defaultReducePartitions(self):
263+
"""
264+
265+
"""
266+
# hard code to avoid the error
267+
return 2
268+
if self.ctx._conf.contains("spark.default.parallelism"):
269+
return self.ctx.defaultParallelism
270+
else:
271+
return self.getNumPartitions()
272+
273+
def getNumPartitions(self):
274+
"""
275+
Returns the number of partitions in RDD
276+
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
277+
>>> rdd.getNumPartitions()
278+
2
279+
"""
280+
return self._jdstream.partitions().size()
281+
259282

260283
class PipelinedDStream(DStream):
261284
def __init__(self, prev, func, preservesPartitioning=False):

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ class PythonDStream[T: ClassTag](
129129
}
130130
}
131131

132-
/*
132+
133133
private class PairwiseDStream(prev:DStream[Array[Byte]]) extends
134134
DStream[(Long, Array[Byte])](prev.ssc){
135135
override def dependencies = List(prev)
@@ -144,9 +144,9 @@ DStream[(Long, Array[Byte])](prev.ssc){
144144
case None => None
145145
}
146146
}
147-
val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream(this)
147+
val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
148148
}
149-
*/
149+
150150

151151

152152

0 commit comments

Comments
 (0)