Skip to content

Commit a32f0fb

Browse files
jerryshaotdas
authored andcommitted
[SPARK-2103][Streaming] Change to ClassTag for KafkaInputDStream and fix reflection issue
This PR updates previous Manifest for KafkaInputDStream's Decoder to ClassTag, also fix the problem addressed in [SPARK-2103](https://issues.apache.org/jira/browse/SPARK-2103). Previous Java interface cannot actually get the type of Decoder, so when using this Manifest to reconstruct the decode object will meet reflection exception. Also for other two Java interfaces, ClassTag[String] is useless because calling Scala API will get the right implicit ClassTag. Current Kafka unit test cannot actually verify the interface. I've tested these interfaces in my local and distribute settings. Author: jerryshao <saisai.shao@intel.com> Closes #1508 from jerryshao/SPARK-2103 and squashes the following commits: e90c37b [jerryshao] Add Mima excludes 7529810 [jerryshao] Change Manifest to ClassTag for KafkaInputDStream's Decoder and fix Decoder construct issue when using Java API
1 parent 284771e commit a32f0fb

File tree

3 files changed

+18
-19
lines changed

3 files changed

+18
-19
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.streaming.kafka
1919

2020
import scala.collection.Map
21-
import scala.reflect.ClassTag
21+
import scala.reflect.{classTag, ClassTag}
2222

2323
import java.util.Properties
2424
import java.util.concurrent.Executors
@@ -48,8 +48,8 @@ private[streaming]
4848
class KafkaInputDStream[
4949
K: ClassTag,
5050
V: ClassTag,
51-
U <: Decoder[_]: Manifest,
52-
T <: Decoder[_]: Manifest](
51+
U <: Decoder[_]: ClassTag,
52+
T <: Decoder[_]: ClassTag](
5353
@transient ssc_ : StreamingContext,
5454
kafkaParams: Map[String, String],
5555
topics: Map[String, Int],
@@ -66,8 +66,8 @@ private[streaming]
6666
class KafkaReceiver[
6767
K: ClassTag,
6868
V: ClassTag,
69-
U <: Decoder[_]: Manifest,
70-
T <: Decoder[_]: Manifest](
69+
U <: Decoder[_]: ClassTag,
70+
T <: Decoder[_]: ClassTag](
7171
kafkaParams: Map[String, String],
7272
topics: Map[String, Int],
7373
storageLevel: StorageLevel
@@ -103,10 +103,10 @@ class KafkaReceiver[
103103
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
104104
}
105105

106-
val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
106+
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
107107
.newInstance(consumerConfig.props)
108108
.asInstanceOf[Decoder[K]]
109-
val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
109+
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
110110
.newInstance(consumerConfig.props)
111111
.asInstanceOf[Decoder[V]]
112112

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ object KafkaUtils {
6565
* in its own thread.
6666
* @param storageLevel Storage level to use for storing the received objects
6767
*/
68-
def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest](
68+
def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
6969
ssc: StreamingContext,
7070
kafkaParams: Map[String, String],
7171
topics: Map[String, Int],
@@ -89,8 +89,6 @@ object KafkaUtils {
8989
groupId: String,
9090
topics: JMap[String, JInt]
9191
): JavaPairReceiverInputDStream[String, String] = {
92-
implicit val cmt: ClassTag[String] =
93-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
9492
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
9593
}
9694

@@ -111,8 +109,6 @@ object KafkaUtils {
111109
topics: JMap[String, JInt],
112110
storageLevel: StorageLevel
113111
): JavaPairReceiverInputDStream[String, String] = {
114-
implicit val cmt: ClassTag[String] =
115-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
116112
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
117113
storageLevel)
118114
}
@@ -140,13 +136,11 @@ object KafkaUtils {
140136
topics: JMap[String, JInt],
141137
storageLevel: StorageLevel
142138
): JavaPairReceiverInputDStream[K, V] = {
143-
implicit val keyCmt: ClassTag[K] =
144-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
145-
implicit val valueCmt: ClassTag[V] =
146-
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
139+
implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
140+
implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)
147141

148-
implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
149-
implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
142+
implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
143+
implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)
150144

151145
createStream[K, V, U, T](
152146
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)

project/MimaExcludes.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,12 @@ object MimaExcludes {
7171
"org.apache.spark.storage.TachyonStore.putValues")
7272
) ++
7373
Seq(
74-
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
74+
ProblemFilters.exclude[MissingMethodProblem](
75+
"org.apache.spark.streaming.flume.FlumeReceiver.this"),
76+
ProblemFilters.exclude[IncompatibleMethTypeProblem](
77+
"org.apache.spark.streaming.kafka.KafkaUtils.createStream"),
78+
ProblemFilters.exclude[IncompatibleMethTypeProblem](
79+
"org.apache.spark.streaming.kafka.KafkaReceiver.this")
7580
) ++
7681
Seq( // Ignore some private methods in ALS.
7782
ProblemFilters.exclude[MissingMethodProblem](

0 commit comments

Comments
 (0)