Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.
This repository was archived by the owner on May 25, 2023. It is now read-only.

java.lang.ClassCastException for reduce api call #57

Closed
@dnrusakov

Description

@dnrusakov

The following example is the modification of StreamToTableJoinScalaIntegrationTestImplicitSerdes test:

server.createTopic(userClicksTopic)

val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]

implicit val serialized: Serialized[String, Long] = Serialized.`with`(stringSerde, longSerde)

val streamsConfiguration: Properties = {
  val p = new Properties()
  p.put(StreamsConfig.APPLICATION_ID_CONFIG, s"stream-table-join-scala-integration-test-implicit-serdes-${scala.util.Random.nextInt(100)}")
  p.put(StreamsConfig.CLIENT_ID_CONFIG, "join-scala-integration-test-implicit-serdes-standard-consumer")
  p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100")
  p.put(StreamsConfig.STATE_DIR_CONFIG, localStateDir)
  p
}

val builder = new StreamsBuilderS()

val userClicksStream: KStreamS[String, Long] = builder.stream(userClicksTopic)

userClicksStream
  .groupByKey
  .reduce((_: Long, v2: Long) => v2, "my-ktable-name")
  .toStream
  .through(outputTopic)
  .foreach((k, v) => println(k -> v))

val streams: KafkaStreams = new KafkaStreams(builder.build, streamsConfiguration)

streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  override def uncaughtException(t: Thread, e: Throwable): Unit = try {
    println(s"Stream terminated because of uncaught exception .. Shutting down app", e)
    e.printStackTrace
    val closed = streams.close()
    println(s"Exiting application after streams close ($closed)")
  } catch {
    case x: Exception => x.printStackTrace
  } finally {
    println("Exiting application ..")
    System.exit(-1)
  }
})

streams.start()

val sender = MessageSender[String, Long](brokers, classOf[StringSerializer].getName, classOf[LongSerializer].getName)

userClicks.foreach(r => sender.writeKeyValue(userClicksTopic, r.key, r.value))

val listener = MessageListener(brokers, outputTopic, "join-scala-integration-test-standard-consumer",
  classOf[StringDeserializer].getName,
  classOf[LongDeserializer].getName,
  new RecordProcessor
)

val l = listener.waitUntilMinKeyValueRecordsReceived(3, 30000)

streams.close()

assertEquals(
  l.sortBy(_.key),
  Seq(
    new KeyValue("chao", 25L),
    new KeyValue("bob", 19L),
    new KeyValue("dave", 56L),
    new KeyValue("eve", 78L),
    new KeyValue("alice", 40L),
    new KeyValue("fang", 99L)
  ).sortBy(_.key)
)

This part of the code above:

userClicksStream
   .groupByKey
   .reduce((_: Long, v2: Long) => v2, "my-ktable-name")

constantly fails with the following exception:

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
at scala.runtime.java8.JFunction2$mcJJJ$sp.apply(JFunction2$mcJJJ$sp.java:12)
at com.lightbend.kafka.scala.streams.KGroupedStreamS.$anonfun$reduce$3(KGroupedStreamS.scala:49)
at com.lightbend.kafka.scala.streams.FunctionConversions$ReducerFromFunction$.$anonfun$asReducer$1(FunctionConversions.scala:46)
at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(KStreamReduce.java:76)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

At the same time the same code rewritten on a pure Java API works fine.

P.S. I use the latest Release 0.1.2 of the kafka-streams-scala library. Scala 2.11.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions