Skip to content

Commit 571d52d

Browse files
Ken TakagiwaKen Takagiwa
Ken Takagiwa
authored and
Ken Takagiwa
committed
added reducedByKey not working yet
1 parent 5720979 commit 571d52d

File tree

3 files changed

+119
-2
lines changed

3 files changed

+119
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,30 @@
11
import sys
22
from operator import add
33

4+
from pyspark.conf import SparkConf
45
from pyspark.streaming.context import StreamingContext
56
from pyspark.streaming.duration import *
67

78
if __name__ == "__main__":
89
if len(sys.argv) != 2:
910
print >> sys.stderr, "Usage: wordcount <directory>"
1011
exit(-1)
11-
ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
12+
conf = SparkConf()
13+
conf.setAppName("PythonStreamingWordCount")
14+
conf.set("spark.default.parallelism", 1)
15+
16+
# ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))
17+
ssc = StreamingContext(conf=conf, duration=Seconds(1))
1218

1319
lines = ssc.textFileStream(sys.argv[1])
1420
fm_lines = lines.flatMap(lambda x: x.split(" "))
1521
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
1622
mapped_lines = fm_lines.map(lambda x: (x, 1))
23+
reduced_lines = mapped_lines.reduce(add)
1724

1825
fm_lines.pyprint()
1926
filtered_lines.pyprint()
2027
mapped_lines.pyprint()
28+
reduced_lines.pyprint()
2129
ssc.start()
2230
ssc.awaitTermination()

python/pyspark/streaming/dstream.py

+25-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
__all__ = ["DStream"]
1111

12+
1213
class DStream(object):
1314
def __init__(self, jdstream, ssc, jrdd_deserializer):
1415
self._jdstream = jdstream
@@ -69,7 +70,7 @@ def _combineByKey(self, createCombiner, mergeValue, mergeCombiners,
6970
"""
7071
"""
7172
if numPartitions is None:
72-
numPartitions = self.ctx._defaultParallelism()
73+
numPartitions = self._defaultReducePartitions()
7374
def combineLocally(iterator):
7475
combiners = {}
7576
for x in iterator:
@@ -130,8 +131,31 @@ def add_shuffle_key(split, iterator):
130131
return dstream
131132

132133
def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
134+
"""
135+
136+
"""
133137
return PipelinedDStream(self, f, preservesPartitioning)
134138

139+
def _defaultReducePartitions(self):
140+
"""
141+
142+
"""
143+
# hard code to avoid the error
144+
return 2
145+
if self.ctx._conf.contains("spark.default.parallelism"):
146+
return self.ctx.defaultParallelism
147+
else:
148+
return self.getNumPartitions()
149+
150+
def getNumPartitions(self):
151+
"""
152+
Returns the number of partitions in RDD
153+
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
154+
>>> rdd.getNumPartitions()
155+
2
156+
"""
157+
return self._jdstream.partitions().size()
158+
135159

136160
class PipelinedDStream(DStream):
137161
def __init__(self, prev, func, preservesPartitioning=False):

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

+85
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,91 @@ class PythonDStream[T: ClassTag](
5555
case None => None
5656
}
5757
}
58+
<<<<<<< HEAD
5859

5960
val asJavaDStream = JavaDStream.fromDStream(this)
6061
}
62+
=======
63+
val asJavaDStream = JavaDStream.fromDStream(this)
64+
65+
/**
66+
* Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
67+
* operator, so this PythonDStream will be registered as an output stream and there materialized.
68+
* Since serialized Python object is readable by Python, pyprint writes out binary data to
69+
* temporary file and run python script to deserialized and print the first ten elements
70+
*/
71+
private[streaming] def ppyprint() {
72+
def foreachFunc = (rdd: RDD[Array[Byte]], time: Time) => {
73+
val iter = rdd.take(11).iterator
74+
75+
// make a temporary file
76+
val prefix = "spark"
77+
val suffix = ".tmp"
78+
val tempFile = File.createTempFile(prefix, suffix)
79+
val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath))
80+
//write out serialized python object
81+
PythonRDD.writeIteratorToStream(iter, tempFileStream)
82+
tempFileStream.close()
83+
84+
// This value has to be passed from python
85+
//val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")
86+
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
87+
//val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile???
88+
//absolute path to the python script is needed to change because we do not use pysparkstreaming
89+
val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pysparkstreaming/streaming/pyprint.py", tempFile.getAbsolutePath)
90+
val workerEnv = pb.environment()
91+
92+
//envVars also need to be pass
93+
//workerEnv.putAll(envVars)
94+
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
95+
workerEnv.put("PYTHONPATH", pythonPath)
96+
val worker = pb.start()
97+
val is = worker.getInputStream()
98+
val isr = new InputStreamReader(is)
99+
val br = new BufferedReader(isr)
100+
101+
println ("-------------------------------------------")
102+
println ("Time: " + time)
103+
println ("-------------------------------------------")
104+
105+
//print value from python std out
106+
var line = ""
107+
breakable {
108+
while (true) {
109+
line = br.readLine()
110+
if (line == null) break()
111+
println(line)
112+
}
113+
}
114+
//delete temporary file
115+
tempFile.delete()
116+
println()
117+
118+
}
119+
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
120+
}
121+
}
122+
123+
124+
private class PairwiseDStream(prev:DStream[Array[Byte]]) extends
125+
DStream[(Long, Array[Byte])](prev.ssc){
126+
override def dependencies = List(prev)
127+
128+
override def slideDuration: Duration = prev.slideDuration
129+
130+
override def compute(validTime:Time):Option[RDD[(Long, Array[Byte])]]={
131+
prev.getOrCompute(validTime) match{
132+
case Some(rdd)=>Some(rdd)
133+
val pairwiseRDD = new PairwiseRDD(rdd)
134+
Some(pairwiseRDD.asJavaPairRDD.rdd)
135+
case None => None
136+
}
137+
}
138+
val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
139+
}
140+
141+
142+
143+
144+
145+
>>>>>>> added reducedByKey not working yet

0 commit comments

Comments
 (0)