Skip to content

Commit 6f98e50

Browse files
Ken Takagiwagiwa
Ken Takagiwa
authored andcommitted
reduceByKey is working
1 parent c455c8d commit 6f98e50

File tree

4 files changed

+29
-83
lines changed

4 files changed

+29
-83
lines changed
1.53 KB
Binary file not shown.

python/pyspark/streaming/dstream.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,9 @@ def add_shuffle_key(split, iterator):
118118
keyed = PipelinedDStream(self, add_shuffle_key)
119119
keyed._bypass_serializer = True
120120
with _JavaStackTrace(self.ctx) as st:
121-
#JavaDStream
122-
pairDStream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairDStream()
123121
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
124-
id(partitionFunc))
125-
jdstream = pairDStream.partitionBy(partitioner).values()
122+
id(partitionFunc))
123+
jdstream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream(), partitioner).asJavaDStream()
126124
dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer))
127125
# This is required so that id(partitionFunc) remains unique, even if
128126
# partitionFunc is a lambda:

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 15 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ class PythonDStream[T: ClassTag](
5555
case None => None
5656
}
5757
}
58-
<<<<<<< HEAD
5958

6059
val asJavaDStream = JavaDStream.fromDStream(this)
6160

@@ -134,87 +133,31 @@ DStream[(Long, Array[Byte])](prev.ssc){
134133
}
135134
val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
136135
}
137-
=======
138-
val asJavaDStream = JavaDStream.fromDStream(this)
139-
140-
/**
141-
* Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
142-
* operator, so this PythonDStream will be registered as an output stream and there materialized.
143-
* Since serialized Python object is readable by Python, pyprint writes out binary data to
144-
* temporary file and run python script to deserialized and print the first ten elements
145-
*/
146-
private[streaming] def ppyprint() {
147-
def foreachFunc = (rdd: RDD[Array[Byte]], time: Time) => {
148-
val iter = rdd.take(11).iterator
149-
150-
// make a temporary file
151-
val prefix = "spark"
152-
val suffix = ".tmp"
153-
val tempFile = File.createTempFile(prefix, suffix)
154-
val tempFileStream = new DataOutputStream(new FileOutputStream(tempFile.getAbsolutePath))
155-
//write out serialized python object
156-
PythonRDD.writeIteratorToStream(iter, tempFileStream)
157-
tempFileStream.close()
158-
159-
// This value has to be passed from python
160-
//val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")
161-
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
162-
//val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile???
163-
//absolute path to the python script is needed to change because we do not use pysparkstreaming
164-
val pb = new ProcessBuilder(pythonExec, sparkHome + "/python/pysparkstreaming/streaming/pyprint.py", tempFile.getAbsolutePath)
165-
val workerEnv = pb.environment()
166-
167-
//envVars also need to be pass
168-
//workerEnv.putAll(envVars)
169-
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
170-
workerEnv.put("PYTHONPATH", pythonPath)
171-
val worker = pb.start()
172-
val is = worker.getInputStream()
173-
val isr = new InputStreamReader(is)
174-
val br = new BufferedReader(isr)
175136

176-
println ("-------------------------------------------")
177-
println ("Time: " + time)
178-
println ("-------------------------------------------")
179-
180-
//print value from python std out
181-
var line = ""
182-
breakable {
183-
while (true) {
184-
line = br.readLine()
185-
if (line == null) break()
186-
println(line)
187-
}
188-
}
189-
//delete temporary file
190-
tempFile.delete()
191-
println()
192137

193-
}
194-
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
195-
}
196-
}
197-
198-
199-
private class PairwiseDStream(prev:DStream[Array[Byte]]) extends
200-
DStream[(Long, Array[Byte])](prev.ssc){
138+
private class PairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends
139+
DStream[Array[Byte]](prev.ssc){
201140
override def dependencies = List(prev)
202141

203142
override def slideDuration: Duration = prev.slideDuration
204143

205-
override def compute(validTime:Time):Option[RDD[(Long, Array[Byte])]]={
144+
override def compute(validTime:Time):Option[RDD[Array[Byte]]]={
206145
prev.getOrCompute(validTime) match{
207146
case Some(rdd)=>Some(rdd)
208147
val pairwiseRDD = new PairwiseRDD(rdd)
209-
Some(pairwiseRDD.asJavaPairRDD.rdd)
148+
/*
149+
* This is equivalent to following python code
150+
* with _JavaStackTrace(self.context) as st:
151+
* pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
152+
* partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
153+
* id(partitionFunc))
154+
* jrdd = pairRDD.partitionBy(partitioner).values()
155+
* rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
156+
*/
157+
Some(pairwiseRDD.asJavaPairRDD.partitionBy(partitioner).values().rdd)
210158
case None => None
211159
}
212160
}
213-
val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
161+
val asJavaDStream = JavaDStream.fromDStream(this)
162+
//val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
214163
}
215-
216-
217-
218-
219-
220-
>>>>>>> added reducedByKey not working yet

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
/*
2+
13
package org.apache.spark.streaming.api.python
24
35
import org.apache.spark.Accumulator
@@ -10,11 +12,8 @@ import org.apache.spark.streaming.dstream.DStream
1012
1113
import scala.reflect.ClassTag
1214
13-
/**
14-
* Created by ken on 7/15/14.
15-
*/
1615
class PythonTransformedDStream[T: ClassTag](
17-
parents: Seq[DStream[T]],
16+
parent: DStream[T],
1817
command: Array[Byte],
1918
envVars: JMap[String, String],
2019
pythonIncludes: JList[String],
@@ -30,8 +29,14 @@ class PythonTransformedDStream[T: ClassTag](
3029
3130
//pythonDStream compute
3231
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
33-
val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
34-
Some()
32+
33+
// val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq
34+
// parents.map(_.getOrCompute(validTime).orNull).to
35+
// parent = parents.head.asInstanceOf[RDD]
36+
// Some()
3537
}
36-
val asJavaDStream = JavaDStream.fromDStream(this)
38+
39+
val asJavaDStream = JavaDStream.fromDStream(this)
3740
}
41+
42+
*/

0 commit comments

Comments
 (0)