Skip to content

Optionally return schemas with AvroProducer #402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,45 @@ def produce(self, **kwargs):
super(AvroProducer, self).produce(topic, value, key, **kwargs)


class AvroMessage(object):
Copy link
Contributor

@rnpridgeon rnpridgeon Jun 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually becomes fairly expensive on a per message basis. Rather than pulling the message type into python I'd suggest simply setting the additional attributes at runtime.

def decode_message(self, message, schema_attr=None):
...
if not schema_attr is None: 
    msg.__setattr__(schema_attr, key_schema_obj)
...

This should be fairly cheap although admittedly it takes away from the readability a bit. Proper documentation should be able to bridge the gap here though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it looks like the Message type fails to set the slot for __setattr__, standby

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah... that's why i had to go this route. i'm open to changing it if you have a better plan though.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rnpridgeon do you have updates on approach for this?

"""
Provides the schemas for the key and value of a message. All other calls are delegated to the original message.
"""
__slots__ = {'_key_schema', '_value_schema', '_message'}

def __init__(self, key_schema, value_schema, message):
self._key_schema = key_schema
self._value_schema = value_schema
self._message = message

def __getattr__(self, item):
return getattr(self._message, item)

@property
def key_schema(self):
"""
The Avro schema used to encode the `key` of this message.
:return:
"""
return self._key_schema

@property
def value_schema(self):
"""
The Avro schema used to encode the `value` of this message.
:return:
"""
return self._value_schema

@property
def message(self):
"""
The raw message.
:return:
"""
return self._message


class AvroConsumer(Consumer):
"""
Kafka Consumer client which does avro schema decoding of messages.
Expand All @@ -103,27 +142,30 @@ def __init__(self, config, schema_registry=None):
super(AvroConsumer, self).__init__(config)
self._serializer = MessageSerializer(schema_registry)

def poll(self, timeout=None):
def poll(self, timeout=None, with_schema=False):
"""
This is an overriden method from confluent_kafka.Consumer class. This handles message
deserialization using avro schema

:param float timeout: Poll timeout in seconds (default: indefinite)
:param boolean with_schema: If true, the key_schema and value_schema are added as properties of the message
(default: False)
:returns: message object with deserialized key and value as dict objects
:rtype: Message
:rtype: Message or AvroMessage
"""
if timeout is None:
timeout = -1
message = super(AvroConsumer, self).poll(timeout)
key_schema = value_schema = None
if message is None:
return None
if not message.value() and not message.key():
return message
if not message.error():
if message.value() is not None:
decoded_value = self._serializer.decode_message(message.value())
decoded_value, value_schema = self._serializer.decode_message(message.value())
message.set_value(decoded_value)
if message.key() is not None:
decoded_key = self._serializer.decode_message(message.key())
decoded_key, key_schema = self._serializer.decode_message(message.key())
message.set_key(decoded_key)
return message
return message if not with_schema else AvroMessage(key_schema, value_schema, message)
35 changes: 23 additions & 12 deletions confluent_kafka/avro/serializer/message_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,17 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False):
return outf.getvalue()

# Decoder support
def _get_decoder_func(self, schema_id, payload):
def _get_decoder_func(self, schema_id, payload, schema):
"""
Returns a function that can decode an avro message. Decoders are cached once created.
:param schema_id: the schema registry id for the schema
:param payload: the binary avro payload
:param schema: the schema used to encode the message
:return:
"""
if schema_id in self.id_to_decoder_func:
return self.id_to_decoder_func[schema_id]

# fetch from schema reg
try:
schema = self.registry_client.get_by_id(schema_id)
except ClientError as e:
raise SerializerError("unable to fetch schema with id %d: %s" % (schema_id, str(e)))

if schema is None:
raise SerializerError("unable to fetch schema with id %d" % (schema_id))

curr_pos = payload.tell()
if HAS_FAST:
# try to use fast avro
Expand Down Expand Up @@ -195,6 +193,18 @@ def decoder(p):
self.id_to_decoder_func[schema_id] = decoder
return self.id_to_decoder_func[schema_id]

def _load_schema(self, schema_id):
# fetch from schema reg
try:
schema = self.registry_client.get_by_id(schema_id)
except ClientError as e:
raise SerializerError("unable to fetch schema with id %d: %s" % (schema_id, str(e)))

if schema is None:
raise SerializerError("unable to fetch schema with id %d" % (schema_id))

return schema

def decode_message(self, message):
"""
Decode a message from kafka that has been encoded for use with
Expand All @@ -212,5 +222,6 @@ def decode_message(self, message):
magic, schema_id = struct.unpack('>bI', payload.read(5))
if magic != MAGIC_BYTE:
raise SerializerError("message does not start with magic byte")
decoder_func = self._get_decoder_func(schema_id, payload)
return decoder_func(payload)
schema = self._load_schema(schema_id)
decoder_func = self._get_decoder_func(schema_id, payload, schema)
return decoder_func(payload), schema
15 changes: 8 additions & 7 deletions tests/avro/test_message_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ def setUp(self):
self.client = MockSchemaRegistryClient()
self.ms = MessageSerializer(self.client)

def assertMessageIsSame(self, message, expected, schema_id):
def assertMessageIsSame(self, message, expected, schema_id, schema):
self.assertTrue(message)
self.assertTrue(len(message) > 5)
magic, sid = struct.unpack('>bI', message[0:5])
self.assertEqual(magic, 0)
self.assertEqual(sid, schema_id)
decoded = self.ms.decode_message(message)
self.assertTrue(decoded)
self.assertEqual(decoded, expected)
decoded_msg, decoded_schema = self.ms.decode_message(message)
self.assertTrue(decoded_msg)
self.assertEqual(decoded_msg, expected)
self.assertEqual(decoded_schema, schema)

def test_encode_with_schema_id(self):
adv = avro.loads(data_gen.ADVANCED_SCHEMA)
Expand All @@ -55,15 +56,15 @@ def test_encode_with_schema_id(self):
records = data_gen.BASIC_ITEMS
for record in records:
message = self.ms.encode_record_with_schema_id(schema_id, record)
self.assertMessageIsSame(message, record, schema_id)
self.assertMessageIsSame(message, record, schema_id, basic)

subject = 'test_adv'
adv_schema_id = self.client.register(subject, adv)
self.assertNotEqual(adv_schema_id, schema_id)
records = data_gen.ADVANCED_ITEMS
for record in records:
message = self.ms.encode_record_with_schema_id(adv_schema_id, record)
self.assertMessageIsSame(message, record, adv_schema_id)
self.assertMessageIsSame(message, record, adv_schema_id, adv)

def test_encode_record_with_schema(self):
topic = 'test'
Expand All @@ -73,7 +74,7 @@ def test_encode_record_with_schema(self):
records = data_gen.BASIC_ITEMS
for record in records:
message = self.ms.encode_record_with_schema(topic, basic, record)
self.assertMessageIsSame(message, record, schema_id)
self.assertMessageIsSame(message, record, schema_id, basic)

def test_decode_none(self):
""""null/None messages should decode to None"""
Expand Down