Closed
Description
Description
Based on the code for AvroProducer
AvroProducer serializes the key only if bool(key) == True
, which allows to pass an empty string as a key without the serialization.
This breaks the read using AvroConsumer, because it tries to deserialize the key if message.key() is not None
And the serializer raises the following error at the length check
Traceback (most recent call last):
File "/Users/alfiya/miniconda3/envs/py3/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2862, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-20-7566bd90b3a5>", line 1, in <module>
m=consumer.poll()
File "/Users/alfiya/miniconda3/envs/py3/lib/python3.6/site-packages/confluent_kafka/avro/__init__.py", line 118, in poll
decoded_key = self._serializer.decode_message(message.key())
File "/Users/alfiya/miniconda3/envs/py3/lib/python3.6/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 209, in decode_message
raise SerializerError("message is too small to decode")
How to reproduce
import avro.schema
import confluent_kafka.avro
producer = confluent_kafka.avro.AvroProducer(
{
'api.version.request': True,
'compression.codec': 'gzip',
'default.topic.config': {'request.required.acks': 1},
'bootstrap.servers': 'localhost:9093',
'schema.registry.url': 'http://localhost:8081',
},
default_key_schema=avro.schema.Parse('{"type": "string"}'),
default_value_schema=avro.schema.Parse('{"type": "string"}')
)
producer.produce(topic='bug', value='not empty', key='')
consumer = confluent_kafka.avro.AvroConsumer({
'group.id': 'mygroup',
'api.version.request': True,
'default.topic.config': {'auto.offset.reset': 'smallest'},
'bootstrap.servers': 'localhost:9093',
'schema.registry.url': 'http://localhost:8081'
})
consumer.subscribe(['bug'])
consumer.poll()
At consumer.poll()
the error above should be raised:
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
):confluent_kafka.version()
: ('0.11.0', 720896),confluent_kafka.libversion()
: ('0.11.3', 721919) - Apache Kafka broker version: 0.10.2.1
- Client configuration:
{...}
provided in 'How to reproduce' - Operating system: Mac OS X 10.13.3
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue