Description
Did some testing with very high ParallelConsumerOptions in a pollAndProduce that reads from input-topic with 1.000.000 records and writes to output-topic.
Config:
ParallelConsumerOptions(consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7f0a133d, producer=org.apache.kafka.clients.producer.KafkaProducer@241fbec, ordering=UNORDERED, commitMode=CONSUMER_ASYNCHRONOUS, maxNumberMessagesBeyondBaseCommitOffset=1000, maxMessagesToQueue=1000, numberOfThreads=100)
Error while running pollAndProduce-loop which closes the consumer:
2020-11-26T21:33:40,069 ERROR [control] i.c.p.ParallelEoSStreamProcessor: Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: Bitset too long to encode: 33944. (max: 32767)
java.lang.RuntimeException: Bitset too long to encode: 33944. (max: 32767)
at io.confluent.parallelconsumer.OffsetSimultaneousEncoder$BitsetEncoder.<init>(OffsetSimultaneousEncoder.java:215)
at io.confluent.parallelconsumer.OffsetSimultaneousEncoder.invoke(OffsetSimultaneousEncoder.java:102)
at io.confluent.parallelconsumer.OffsetMapCodecManager.encodeOffsetsCompressed(OffsetMapCodecManager.java:113)
at io.confluent.parallelconsumer.OffsetMapCodecManager.serialiseIncompleteOffsetMapToBase64(OffsetMapCodecManager.java:97)
at io.confluent.parallelconsumer.OffsetMapCodecManager.makeOffsetMetadataPayload(OffsetMapCodecManager.java:91)
at io.confluent.parallelconsumer.WorkManager.findCompletedEligibleOffsetsAndRemove(WorkManager.java:553)
at io.confluent.parallelconsumer.WorkManager.hasComittableOffsets(WorkManager.java:481)
at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.commitOffsetsMaybe(ParallelEoSStreamProcessor.java:696)
at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.controlLoop(ParallelEoSStreamProcessor.java:588)
at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.lambda$supervisorLoop$10(ParallelEoSStreamProcessor.java:544)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
Happens consistently for various “bitset-lengths > 32767". However, when I add a 10ms sleep in the pollAndProduduce-loop it goes away.
Pushed a test that should reproduce: JorgenRingen@eb1e9be
(just duplicated TransactionAndCommitModeTest initially for simplicity)