Skip to content

Commit 04af046

Browse files
Ken TakagiwaKen Takagiwa
Ken Takagiwa
authored and
Ken Takagiwa
committed
reduceByKey is working
1 parent 3b6d7b0 commit 04af046

File tree

4 files changed

+29
-83
lines changed

4 files changed

+29
-83
lines changed
1.53 KB
Binary file not shown.

python/pyspark/streaming/dstream.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,9 @@ def add_shuffle_key(split, iterator):
118118
keyed = PipelinedDStream(self, add_shuffle_key)
119119
keyed._bypass_serializer = True
120120
with _JavaStackTrace(self.ctx) as st:
121-
#JavaDStream
122-
pairDStream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairDStream()
123121
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
124-
id(partitionFunc))
125-
jdstream = pairDStream.partitionBy(partitioner).values()
122+
id(partitionFunc))
123+
jdstream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream(), partitioner).asJavaDStream()
126124
dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer))
127125
# This is required so that id(partitionFunc) remains unique, even if
128126
# partitionFunc is a lambda:

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 15 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -55,91 +55,34 @@ class PythonDStream[T: ClassTag](
5555
case None => None
5656
}
5757
}
58-
<<<<<<< HEAD
5958

6059
val asJavaDStream = JavaDStream.fromDStream(this)
6160
}
62-
=======
63-
val asJavaDStream = JavaDStream.fromDStream(this)
64-
65-
/**
66-
* Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
67-
* operator, so this PythonDStream will be registered as an output stream and there materialized.
68-
* Since serialized Python object is readable by Python, pyprint writes out binary data to
69-
* temporary file and run python script to deserialized and print the first ten elements
70-
*/
71-
private[streaming] def ppyprint() {
72-
def foreachFunc = (rdd: RDD[Array[Byte]], time: Time) => {
73-
val iter = rdd.take(11).iterator
74-
75-
// make a temporary file
76-
val prefix = "spark"
77-
val suffix = ".tmp"
78-
val tempFile = File.createTempFile(prefix, suffix)
79-
val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath))
80-
//write out serialized python object
81-
PythonRDD.writeIteratorToStream(iter, tempFileStream)
82-
tempFileStream.close()
83-
84-
// This value has to be passed from python
85-
//val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")
86-
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
87-
//val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile???
88-
//absolute path to the python script is needed to change because we do not use pysparkstreaming
89-
val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pysparkstreaming/streaming/pyprint.py", tempFile.getAbsolutePath)
90-
val workerEnv = pb.environment()
91-
92-
//envVars also need to be pass
93-
//workerEnv.putAll(envVars)
94-
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
95-
workerEnv.put("PYTHONPATH", pythonPath)
96-
val worker = pb.start()
97-
val is = worker.getInputStream()
98-
val isr = new InputStreamReader(is)
99-
val br = new BufferedReader(isr)
10061

101-
println ("-------------------------------------------")
102-
println ("Time: " + time)
103-
println ("-------------------------------------------")
10462

105-
//print value from python std out
106-
var line = ""
107-
breakable {
108-
while (true) {
109-
line = br.readLine()
110-
if (line == null) break()
111-
println(line)
112-
}
113-
}
114-
//delete temporary file
115-
tempFile.delete()
116-
println()
117-
118-
}
119-
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
120-
}
121-
}
122-
123-
124-
private class PairwiseDStream(prev:DStream[Array[Byte]]) extends
125-
DStream[(Long, Array[Byte])](prev.ssc){
63+
private class PairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends
64+
DStream[Array[Byte]](prev.ssc){
12665
override def dependencies = List(prev)
12766

12867
override def slideDuration: Duration = prev.slideDuration
12968

130-
override def compute(validTime:Time):Option[RDD[(Long, Array[Byte])]]={
69+
override def compute(validTime:Time):Option[RDD[Array[Byte]]]={
13170
prev.getOrCompute(validTime) match{
13271
case Some(rdd)=>Some(rdd)
13372
val pairwiseRDD = new PairwiseRDD(rdd)
134-
Some(pairwiseRDD.asJavaPairRDD.rdd)
73+
/*
74+
* This is equivalent to following python code
75+
* with _JavaStackTrace(self.context) as st:
76+
* pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
77+
* partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
78+
* id(partitionFunc))
79+
* jrdd = pairRDD.partitionBy(partitioner).values()
80+
* rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
81+
*/
82+
Some(pairwiseRDD.asJavaPairRDD.partitionBy(partitioner).values().rdd)
13583
case None => None
13684
}
13785
}
138-
val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
86+
val asJavaDStream = JavaDStream.fromDStream(this)
87+
//val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
13988
}
140-
141-
142-
143-
144-
145-
>>>>>>> added reducedByKey not working yet

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
/*
2+
13
package org.apache.spark.streaming.api.python
24
35
import org.apache.spark.Accumulator
@@ -10,11 +12,8 @@ import org.apache.spark.streaming.dstream.DStream
1012
1113
import scala.reflect.ClassTag
1214
13-
/**
14-
* Created by ken on 7/15/14.
15-
*/
1615
class PythonTransformedDStream[T: ClassTag](
17-
parents: Seq[DStream[T]],
16+
parent: DStream[T],
1817
command: Array[Byte],
1918
envVars: JMap[String, String],
2019
pythonIncludes: JList[String],
@@ -30,8 +29,14 @@ class PythonTransformedDStream[T: ClassTag](
3029
3130
//pythonDStream compute
3231
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
33-
val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
34-
Some()
32+
33+
// val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
34+
// parents.map(_.getOrCompute(validTime).orNull).to
35+
// parent = parents.head.asInstanceOf[RDD]
36+
// Some()
3537
}
36-
val asJavaDStream = JavaDStream.fromDStream(this)
38+
39+
val asJavaDStream = JavaDStream.fromDStream(this)
3740
}
41+
42+
*/

0 commit comments

Comments
 (0)