17
17
18
18
package org .apache .spark .streaming .dstream
19
19
20
-
21
- import java .io ._
20
+ import java .io .{IOException , ObjectInputStream , ObjectOutputStream }
22
21
23
22
import scala .deprecated
24
23
import scala .collection .mutable .HashMap
25
24
import scala .reflect .ClassTag
26
- import java .io .{IOException , ObjectInputStream , ObjectOutputStream }
27
- import scala .util .control .Breaks ._
28
25
29
26
import org .apache .spark .{Logging , SparkException }
30
27
import org .apache .spark .rdd .{BlockRDD , RDD }
@@ -34,7 +31,6 @@ import org.apache.spark.streaming.StreamingContext._
34
31
import org .apache .spark .streaming .scheduler .Job
35
32
import org .apache .spark .util .MetadataCleaner
36
33
import org .apache .spark .streaming .Duration
37
- import org .apache .spark .api .python .PythonRDD
38
34
39
35
/**
40
36
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -562,11 +558,9 @@ abstract class DStream[T: ClassTag] (
562
558
// DStreams can't be serialized with closures, we can't proactively check
563
559
// it for serializability and so we pass the optional false to SparkContext.clean
564
560
565
- // serialized python
566
561
val cleanedF = context.sparkContext.clean(transformFunc, false )
567
562
val realTransformFunc = (rdds : Seq [RDD [_]], time : Time ) => {
568
563
assert(rdds.length == 1 )
569
- // if transformfunc is fine, it is okay
570
564
cleanedF(rdds.head.asInstanceOf [RDD [T ]], time)
571
565
}
572
566
new TransformedDStream [U ](Seq (this ), realTransformFunc)
0 commit comments