java.lang.ClassCastException for reduce api call #57
Description
The following example is the modification of StreamToTableJoinScalaIntegrationTestImplicitSerdes test:
server.createTopic(userClicksTopic)
val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
implicit val serialized: Serialized[String, Long] = Serialized.`with`(stringSerde, longSerde)
val streamsConfiguration: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, s"stream-table-join-scala-integration-test-implicit-serdes-${scala.util.Random.nextInt(100)}")
p.put(StreamsConfig.CLIENT_ID_CONFIG, "join-scala-integration-test-implicit-serdes-standard-consumer")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100")
p.put(StreamsConfig.STATE_DIR_CONFIG, localStateDir)
p
}
val builder = new StreamsBuilderS()
val userClicksStream: KStreamS[String, Long] = builder.stream(userClicksTopic)
userClicksStream
.groupByKey
.reduce((_: Long, v2: Long) => v2, "my-ktable-name")
.toStream
.through(outputTopic)
.foreach((k, v) => println(k -> v))
val streams: KafkaStreams = new KafkaStreams(builder.build, streamsConfiguration)
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
override def uncaughtException(t: Thread, e: Throwable): Unit = try {
println(s"Stream terminated because of uncaught exception .. Shutting down app", e)
e.printStackTrace
val closed = streams.close()
println(s"Exiting application after streams close ($closed)")
} catch {
case x: Exception => x.printStackTrace
} finally {
println("Exiting application ..")
System.exit(-1)
}
})
streams.start()
val sender = MessageSender[String, Long](brokers, classOf[StringSerializer].getName, classOf[LongSerializer].getName)
userClicks.foreach(r => sender.writeKeyValue(userClicksTopic, r.key, r.value))
val listener = MessageListener(brokers, outputTopic, "join-scala-integration-test-standard-consumer",
classOf[StringDeserializer].getName,
classOf[LongDeserializer].getName,
new RecordProcessor
)
val l = listener.waitUntilMinKeyValueRecordsReceived(3, 30000)
streams.close()
assertEquals(
l.sortBy(_.key),
Seq(
new KeyValue("chao", 25L),
new KeyValue("bob", 19L),
new KeyValue("dave", 56L),
new KeyValue("eve", 78L),
new KeyValue("alice", 40L),
new KeyValue("fang", 99L)
).sortBy(_.key)
)
This part of the code above:
userClicksStream
.groupByKey
.reduce((_: Long, v2: Long) => v2, "my-ktable-name")
constantly fails with the following exception:
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
at scala.runtime.java8.JFunction2$mcJJJ$sp.apply(JFunction2$mcJJJ$sp.java:12)
at com.lightbend.kafka.scala.streams.KGroupedStreamS.$anonfun$reduce$3(KGroupedStreamS.scala:49)
at com.lightbend.kafka.scala.streams.FunctionConversions$ReducerFromFunction$.$anonfun$asReducer$1(FunctionConversions.scala:46)
at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(KStreamReduce.java:76)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
At the same time the same code rewritten on a pure Java API works fine.
P.S. I use the latest Release 0.1.2 of the kafka-streams-scala library. Scala 2.11.