Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Batch ConsumerRecord consumption. #502

Closed
KaustubhKhati opened this issue Mar 21, 2022 · 5 comments · Fixed by #598
Closed

Support for Batch ConsumerRecord consumption. #502

KaustubhKhati opened this issue Mar 21, 2022 · 5 comments · Fixed by #598
Labels
type: enhancement New feature or request

Comments

@KaustubhKhati
Copy link

Feature description

Hi,

Is it possible to add support for consuming ConsumerRecords as a Batch.
Today if we try List<ConsumerRecord<String, String>> or Flux<ConsumerRecord<String, String>> both these throw an error

Caused by: io.micronaut.core.serialize.exceptions.SerializationException: Error deserializing object from JSON: Cannot construct instance of org.apache.kafka.clients.consumer.ConsumerRecord (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)

This would be helpful in case we want to consume Headers, Keys for these events.

@Crow-EH
Copy link

Crow-EH commented Nov 24, 2022

Thank you @KaustubhKhati, and @dhofftgt for the PR.

I'm also impacted.

Since there's no milestone, I guess I'll try to override KafkaConsumerProcessor on my project for now. 😓

I'll let you know if it's a sane workaround.

@Crow-EH
Copy link

Crow-EH commented Nov 24, 2022

Theres' a bunch of private and package-private in there, so I ended up copying @dhofftgt 's entire KafkaConsumerProcessor, putting it in the same package, renaming it and putting an @Replaces.

It's ugly, but it works like a charm if you're in a hurry ! 😄

package io.micronaut.configuration.kafka.processor;

...

@Singleton
@Requires(beans = KafkaDefaultConfiguration.class)
@Replaces(KafkaConsumerProcessor.class)
@Internal
class PatchedKafkaConsumerProcessor
    implements ExecutableMethodProcessor<Topic>, AutoCloseable, ConsumerRegistry {
...

@joshterrelltarget
Copy link

Hi, I was stumbling around on this problem too and then a member of my org at Target shared a solution:

I was able to consume a List<ConsumerRecord<ByteArray, ByteArray>> using batch=true and specifying the key and value deserializers in the kafka config.

kafka:
  consumers:
      key:
        deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      value:
        deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

Cheers 👋

@Crow-EH
Copy link

Crow-EH commented Dec 21, 2022

Does that mean all consumers methods, even non-batch ones, always receive ByteArray ?

That's more maintainable than my workaround, but that's still pretty sad IMO.

@joshterrelltarget
Copy link

joshterrelltarget commented Jan 3, 2023

I don't know if you could specify ByteArrayDeserializer and still use micronaut's pojo conversion 🤔 But I agree, kind of unfortunate.

Here's a more thorough solution that still needs to get approved by the maintainers
#598

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement New feature or request
Projects
None yet
4 participants