Skip to content

Commit 80a13e8

Browse files
committed
Used fake class tag syntax
1 parent 26eb3f6 commit 80a13e8

File tree

2 files changed

+56
-88
lines changed

2 files changed

+56
-88
lines changed

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
139139

140140
/** Return a new DStream by applying a function to all elements of this DStream. */
141141
def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
142-
def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
142+
def cm: ClassTag[(K2, V2)] = fakeClassTag
143143
new JavaPairDStream(dstream.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
144144
}
145145

@@ -160,7 +160,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
160160
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
161161
import scala.collection.JavaConverters._
162162
def fn = (x: T) => f.call(x).asScala
163-
def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
163+
def cm: ClassTag[(K2, V2)] = fakeClassTag
164164
new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
165165
}
166166

@@ -284,8 +284,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
284284
* on each RDD of 'this' DStream.
285285
*/
286286
def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
287-
implicit val cm: ClassTag[U] =
288-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
287+
implicit val cm: ClassTag[U] = fakeClassTag
288+
289289
def scalaTransform (in: RDD[T]): RDD[U] =
290290
transformFunc.call(wrapRDD(in)).rdd
291291
dstream.transform(scalaTransform(_))
@@ -296,8 +296,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
296296
* on each RDD of 'this' DStream.
297297
*/
298298
def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
299-
implicit val cm: ClassTag[U] =
300-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
299+
implicit val cm: ClassTag[U] = fakeClassTag
300+
301301
def scalaTransform (in: RDD[T], time: Time): RDD[U] =
302302
transformFunc.call(wrapRDD(in), time).rdd
303303
dstream.transform(scalaTransform(_, _))
@@ -309,10 +309,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
309309
*/
310310
def transformToPair[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
311311
JavaPairDStream[K2, V2] = {
312-
implicit val cmk: ClassTag[K2] =
313-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
314-
implicit val cmv: ClassTag[V2] =
315-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
312+
implicit val cmk: ClassTag[K2] = fakeClassTag
313+
implicit val cmv: ClassTag[V2] = fakeClassTag
314+
316315
def scalaTransform (in: RDD[T]): RDD[(K2, V2)] =
317316
transformFunc.call(wrapRDD(in)).rdd
318317
dstream.transform(scalaTransform(_))
@@ -324,10 +323,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
324323
*/
325324
def transformToPair[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
326325
JavaPairDStream[K2, V2] = {
327-
implicit val cmk: ClassTag[K2] =
328-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
329-
implicit val cmv: ClassTag[V2] =
330-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
326+
implicit val cmk: ClassTag[K2] = fakeClassTag
327+
implicit val cmv: ClassTag[V2] = fakeClassTag
328+
331329
def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] =
332330
transformFunc.call(wrapRDD(in), time).rdd
333331
dstream.transform(scalaTransform(_, _))
@@ -341,10 +339,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
341339
other: JavaDStream[U],
342340
transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]]
343341
): JavaDStream[W] = {
344-
implicit val cmu: ClassTag[U] =
345-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
346-
implicit val cmv: ClassTag[W] =
347-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
342+
implicit val cmu: ClassTag[U] = fakeClassTag
343+
implicit val cmv: ClassTag[W] = fakeClassTag
344+
348345
def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] =
349346
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
350347
dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _))
@@ -358,12 +355,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
358355
other: JavaDStream[U],
359356
transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]]
360357
): JavaPairDStream[K2, V2] = {
361-
implicit val cmu: ClassTag[U] =
362-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
363-
implicit val cmk2: ClassTag[K2] =
364-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
365-
implicit val cmv2: ClassTag[V2] =
366-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
358+
implicit val cmu: ClassTag[U] = fakeClassTag
359+
implicit val cmk2: ClassTag[K2] = fakeClassTag
360+
implicit val cmv2: ClassTag[V2] = fakeClassTag
367361
def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] =
368362
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
369363
dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _))
@@ -377,12 +371,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
377371
other: JavaPairDStream[K2, V2],
378372
transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]]
379373
): JavaDStream[W] = {
380-
implicit val cmk2: ClassTag[K2] =
381-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
382-
implicit val cmv2: ClassTag[V2] =
383-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
384-
implicit val cmw: ClassTag[W] =
385-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
374+
implicit val cmk2: ClassTag[K2] = fakeClassTag
375+
implicit val cmv2: ClassTag[V2] = fakeClassTag
376+
implicit val cmw: ClassTag[W] = fakeClassTag
377+
386378
def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] =
387379
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
388380
dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
@@ -396,14 +388,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
396388
other: JavaPairDStream[K2, V2],
397389
transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]]
398390
): JavaPairDStream[K3, V3] = {
399-
implicit val cmk2: ClassTag[K2] =
400-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
401-
implicit val cmv2: ClassTag[V2] =
402-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
403-
implicit val cmk3: ClassTag[K3] =
404-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K3]]
405-
implicit val cmv3: ClassTag[V3] =
406-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V3]]
391+
implicit val cmk2: ClassTag[K2] = fakeClassTag
392+
implicit val cmv2: ClassTag[V2] = fakeClassTag
393+
implicit val cmk3: ClassTag[K3] = fakeClassTag
394+
implicit val cmv3: ClassTag[V3] = fakeClassTag
407395
def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] =
408396
transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
409397
dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _))

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala

Lines changed: 30 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,25 @@
1717

1818
package org.apache.spark.streaming.api.java
1919

20-
import java.util.{List => JList}
2120
import java.lang.{Long => JLong}
21+
import java.util.{List => JList}
2222

2323
import scala.collection.JavaConversions._
2424
import scala.reflect.ClassTag
2525

26-
import org.apache.spark.streaming._
27-
import org.apache.spark.streaming.StreamingContext._
28-
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
29-
import org.apache.spark.Partitioner
26+
import com.google.common.base.Optional
27+
import org.apache.hadoop.conf.Configuration
3028
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
3129
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
32-
import org.apache.hadoop.conf.Configuration
33-
import org.apache.spark.api.java.{JavaUtils, JavaPairRDD}
30+
import org.apache.spark.Partitioner
31+
import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
3432
import org.apache.spark.api.java.JavaPairRDD._
35-
import org.apache.spark.storage.StorageLevel
36-
import com.google.common.base.Optional
33+
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
34+
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
3735
import org.apache.spark.rdd.RDD
38-
import org.apache.spark.rdd.PairRDDFunctions
36+
import org.apache.spark.storage.StorageLevel
37+
import org.apache.spark.streaming._
38+
import org.apache.spark.streaming.StreamingContext._
3939
import org.apache.spark.streaming.dstream.DStream
4040

4141
/**
@@ -169,8 +169,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
169169
mergeCombiners: JFunction2[C, C, C],
170170
partitioner: Partitioner
171171
): JavaPairDStream[K, C] = {
172-
implicit val cm: ClassTag[C] =
173-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
172+
implicit val cm: ClassTag[C] = fakeClassTag
174173
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
175174
}
176175

@@ -185,8 +184,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
185184
partitioner: Partitioner,
186185
mapSideCombine: Boolean
187186
): JavaPairDStream[K, C] = {
188-
implicit val cm: ClassTag[C] =
189-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
187+
implicit val cm: ClassTag[C] = fakeClassTag
190188
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine)
191189
}
192190

@@ -454,8 +452,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
454452
*/
455453
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
456454
: JavaPairDStream[K, S] = {
457-
implicit val cm: ClassTag[S] =
458-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
455+
implicit val cm: ClassTag[S] = fakeClassTag
459456
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
460457
}
461458

@@ -472,8 +469,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
472469
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
473470
numPartitions: Int)
474471
: JavaPairDStream[K, S] = {
475-
implicit val cm: ClassTag[S] =
476-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
472+
implicit val cm: ClassTag[S] = fakeClassTag
477473
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions)
478474
}
479475

@@ -491,8 +487,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
491487
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
492488
partitioner: Partitioner
493489
): JavaPairDStream[K, S] = {
494-
implicit val cm: ClassTag[S] =
495-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
490+
implicit val cm: ClassTag[S] = fakeClassTag
496491
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner)
497492
}
498493

@@ -502,8 +497,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
502497
* 'this' DStream without changing the key.
503498
*/
504499
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
505-
implicit val cm: ClassTag[U] =
506-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
500+
implicit val cm: ClassTag[U] = fakeClassTag
507501
dstream.mapValues(f)
508502
}
509503

@@ -525,8 +519,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
525519
* of partitions.
526520
*/
527521
def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
528-
implicit val cm: ClassTag[W] =
529-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
522+
implicit val cm: ClassTag[W] = fakeClassTag
530523
dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
531524
}
532525

@@ -538,8 +531,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
538531
other: JavaPairDStream[K, W],
539532
numPartitions: Int
540533
): JavaPairDStream[K, (JList[V], JList[W])] = {
541-
implicit val cm: ClassTag[W] =
542-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
534+
implicit val cm: ClassTag[W] = fakeClassTag
543535
dstream.cogroup(other.dstream, numPartitions)
544536
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
545537
}
@@ -552,8 +544,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
552544
other: JavaPairDStream[K, W],
553545
partitioner: Partitioner
554546
): JavaPairDStream[K, (JList[V], JList[W])] = {
555-
implicit val cm: ClassTag[W] =
556-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
547+
implicit val cm: ClassTag[W] = fakeClassTag
557548
dstream.cogroup(other.dstream, partitioner)
558549
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
559550
}
@@ -563,8 +554,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
563554
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
564555
*/
565556
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
566-
implicit val cm: ClassTag[W] =
567-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
557+
implicit val cm: ClassTag[W] = fakeClassTag
568558
dstream.join(other.dstream)
569559
}
570560

@@ -573,8 +563,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
573563
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
574564
*/
575565
def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = {
576-
implicit val cm: ClassTag[W] =
577-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
566+
implicit val cm: ClassTag[W] = fakeClassTag
578567
dstream.join(other.dstream, numPartitions)
579568
}
580569

@@ -586,8 +575,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
586575
other: JavaPairDStream[K, W],
587576
partitioner: Partitioner
588577
): JavaPairDStream[K, (V, W)] = {
589-
implicit val cm: ClassTag[W] =
590-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
578+
implicit val cm: ClassTag[W] = fakeClassTag
591579
dstream.join(other.dstream, partitioner)
592580
}
593581

@@ -597,8 +585,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
597585
* number of partitions.
598586
*/
599587
def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = {
600-
implicit val cm: ClassTag[W] =
601-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
588+
implicit val cm: ClassTag[W] = fakeClassTag
602589
val joinResult = dstream.leftOuterJoin(other.dstream)
603590
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
604591
}
@@ -612,8 +599,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
612599
other: JavaPairDStream[K, W],
613600
numPartitions: Int
614601
): JavaPairDStream[K, (V, Optional[W])] = {
615-
implicit val cm: ClassTag[W] =
616-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
602+
implicit val cm: ClassTag[W] = fakeClassTag
617603
val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions)
618604
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
619605
}
@@ -626,8 +612,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
626612
other: JavaPairDStream[K, W],
627613
partitioner: Partitioner
628614
): JavaPairDStream[K, (V, Optional[W])] = {
629-
implicit val cm: ClassTag[W] =
630-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
615+
implicit val cm: ClassTag[W] = fakeClassTag
631616
val joinResult = dstream.leftOuterJoin(other.dstream, partitioner)
632617
joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}
633618
}
@@ -653,8 +638,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
653638
other: JavaPairDStream[K, W],
654639
numPartitions: Int
655640
): JavaPairDStream[K, (Optional[V], W)] = {
656-
implicit val cm: ClassTag[W] =
657-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
641+
implicit val cm: ClassTag[W] = fakeClassTag
658642
val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions)
659643
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
660644
}
@@ -668,8 +652,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
668652
other: JavaPairDStream[K, W],
669653
partitioner: Partitioner
670654
): JavaPairDStream[K, (Optional[V], W)] = {
671-
implicit val cm: ClassTag[W] =
672-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
655+
implicit val cm: ClassTag[W] = fakeClassTag
673656
val joinResult = dstream.rightOuterJoin(other.dstream, partitioner)
674657
joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}
675658
}
@@ -749,8 +732,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
749732
new JavaDStream[(K, V)](dstream)
750733
}
751734

752-
override val classTag: ClassTag[(K, V)] =
753-
implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]]
735+
override val classTag: ClassTag[(K, V)] = fakeClassTag
754736
}
755737

756738
object JavaPairDStream {
@@ -759,10 +741,8 @@ object JavaPairDStream {
759741
}
760742

761743
def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
762-
implicit val cmk: ClassTag[K] =
763-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
764-
implicit val cmv: ClassTag[V] =
765-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
744+
implicit val cmk: ClassTag[K] = fakeClassTag
745+
implicit val cmv: ClassTag[V] = fakeClassTag
766746
new JavaPairDStream[K, V](dstream.dstream)
767747
}
768748

0 commit comments

Comments
 (0)