Skip to content

Commit c455c8d

Browse files
Ken Takagiwagiwa
Ken Takagiwa
authored andcommitted
added reducedByKey not working yet
1 parent dc6995d commit c455c8d

File tree

2 files changed

+86
-1
lines changed

2 files changed

+86
-1
lines changed

python/pyspark/streaming/dstream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ def _defaultReducePartitions(self):
143143
if self.ctx._conf.contains("spark.default.parallelism"):
144144
return self.ctx.defaultParallelism
145145
else:
146-
return self.getNumPartitions()
146+
return 2
147147

148148
def getNumPartitions(self):
149149
"""

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class PythonDStream[T: ClassTag](
5555
case None => None
5656
}
5757
}
58+
<<<<<<< HEAD
5859

5960
val asJavaDStream = JavaDStream.fromDStream(this)
6061

@@ -133,3 +134,87 @@ DStream[(Long, Array[Byte])](prev.ssc){
133134
}
134135
val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
135136
}
137+
=======
138+
val asJavaDStream = JavaDStream.fromDStream(this)
139+
140+
/**
141+
* Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
142+
* operator, so this PythonDStream will be registered as an output stream and there materialized.
143+
* Since serialized Python object is readable by Python, pyprint writes out binary data to
144+
* temporary file and run python script to deserialized and print the first ten elements
145+
*/
146+
private[streaming] def ppyprint() {
147+
def foreachFunc = (rdd: RDD[Array[Byte]], time: Time) => {
148+
val iter = rdd.take(11).iterator
149+
150+
// make a temporary file
151+
val prefix = "spark"
152+
val suffix = ".tmp"
153+
val tempFile = File.createTempFile(prefix, suffix)
154+
val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath))
155+
//write out serialized python object
156+
PythonRDD.writeIteratorToStream(iter, tempFileStream)
157+
tempFileStream.close()
158+
159+
// This value has to be passed from python
160+
//val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")
161+
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
162+
//val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile???
163+
//absolute path to the python script is needed to change because we do not use pysparkstreaming
164+
val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pysparkstreaming/streaming/pyprint.py", tempFile.getAbsolutePath)
165+
val workerEnv = pb.environment()
166+
167+
//envVars also need to be pass
168+
//workerEnv.putAll(envVars)
169+
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
170+
workerEnv.put("PYTHONPATH", pythonPath)
171+
val worker = pb.start()
172+
val is = worker.getInputStream()
173+
val isr = new InputStreamReader(is)
174+
val br = new BufferedReader(isr)
175+
176+
println ("-------------------------------------------")
177+
println ("Time: " + time)
178+
println ("-------------------------------------------")
179+
180+
//print value from python std out
181+
var line = ""
182+
breakable {
183+
while (true) {
184+
line = br.readLine()
185+
if (line == null) break()
186+
println(line)
187+
}
188+
}
189+
//delete temporary file
190+
tempFile.delete()
191+
println()
192+
193+
}
194+
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
195+
}
196+
}
197+
198+
199+
private class PairwiseDStream(prev:DStream[Array[Byte]]) extends
200+
DStream[(Long, Array[Byte])](prev.ssc){
201+
override def dependencies = List(prev)
202+
203+
override def slideDuration: Duration = prev.slideDuration
204+
205+
override def compute(validTime:Time):Option[RDD[(Long, Array[Byte])]]={
206+
prev.getOrCompute(validTime) match{
207+
case Some(rdd)=>Some(rdd)
208+
val pairwiseRDD = new PairwiseRDD(rdd)
209+
Some(pairwiseRDD.asJavaPairRDD.rdd)
210+
case None => None
211+
}
212+
}
213+
val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
214+
}
215+
216+
217+
218+
219+
220+
>>>>>>> added reducedByKey not working yet

0 commit comments

Comments
 (0)