Skip to content

AvroProducer doesn't serialize the key if it's an empty string #342

Closed
@alfiya400

Description

@alfiya400

Description

Based on the code for AvroProducer
AvroProducer serializes the key only if bool(key) == True, which allows to pass an empty string as a key without the serialization.

This breaks the read using AvroConsumer, because it tries to deserialize the key if message.key() is not None
And the serializer raises the following error at the length check

Traceback (most recent call last):
  File "/Users/alfiya/miniconda3/envs/py3/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2862, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-20-7566bd90b3a5>", line 1, in <module>
    m=consumer.poll()
  File "/Users/alfiya/miniconda3/envs/py3/lib/python3.6/site-packages/confluent_kafka/avro/__init__.py", line 118, in poll
    decoded_key = self._serializer.decode_message(message.key())
  File "/Users/alfiya/miniconda3/envs/py3/lib/python3.6/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 209, in decode_message
    raise SerializerError("message is too small to decode")

How to reproduce

import avro.schema
import confluent_kafka.avro

producer = confluent_kafka.avro.AvroProducer(
    {
        'api.version.request': True,
        'compression.codec': 'gzip',
        'default.topic.config': {'request.required.acks': 1},
        'bootstrap.servers': 'localhost:9093',
        'schema.registry.url': 'http://localhost:8081',
    }, 
    default_key_schema=avro.schema.Parse('{"type": "string"}'), 
    default_value_schema=avro.schema.Parse('{"type": "string"}')
)
producer.produce(topic='bug', value='not empty', key='')

consumer = confluent_kafka.avro.AvroConsumer({
    'group.id': 'mygroup', 
    'api.version.request': True,
    'default.topic.config': {'auto.offset.reset': 'smallest'},
    'bootstrap.servers': 'localhost:9093',
    'schema.registry.url': 'http://localhost:8081'
    
})
consumer.subscribe(['bug'])
consumer.poll()

At consumer.poll() the error above should be raised:

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): confluent_kafka.version(): ('0.11.0', 720896), confluent_kafka.libversion(): ('0.11.3', 721919)
  • Apache Kafka broker version: 0.10.2.1
  • Client configuration: {...} provided in 'How to reproduce'
  • Operating system: Mac OS X 10.13.3
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions