Skip to content

avro consumer with explicitly specified read schema #470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Nov 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ class AvroConsumer(Consumer):
Constructor takes below parameters

:param dict config: Config parameters containing url for schema registry (``schema.registry.url``)
and the standard Kafka client configuration (``bootstrap.servers`` et.al).
and the standard Kafka client configuration (``bootstrap.servers`` et.al)
:param optional a reader schema for the message key
:param optional a reader schema for the message value
"""
def __init__(self, config, schema_registry=None):
def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None):
def __init__(self, config, schema_registry=None, key_reader_schema=None, value_reader_schema=None):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The word reader actually describes the schema rather than the message fields. I suggest instead naming these parameters key_reader_schema and value_reader_schema.

Copy link
Contributor Author

@fkaufer fkaufer Oct 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it first that way (or key_schema_reader considering reader as var index), but this would be inconsistent with default_key_schema and default_value_schema in the constructor:

See

def __init__(self, config, default_key_schema=None,
default_value_schema=None, schema_registry=None):

Furthermore I consider key_schema/value_schema compounds naming an attribute/member of the object/namespace reader

This is also more consistent with fastavro (writer_schema, reader_schema)
https://fastavro.readthedocs.io/en/latest/reader.html#fastavro._read_py.schemaless_reader

and the REST Proxy (key_schema and value_schema)
https://docs.confluent.io/current/kafka-rest/docs/api.html#post--topics-(string-topic_name)


schema_registry_url = config.pop("schema.registry.url", None)
schema_registry_ca_location = config.pop("schema.registry.ssl.ca.location", None)
Expand All @@ -119,7 +121,7 @@ def __init__(self, config, schema_registry=None):
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")

super(AvroConsumer, self).__init__(config)
self._serializer = MessageSerializer(schema_registry)
self._serializer = MessageSerializer(schema_registry, reader_key_schema, reader_value_schema)

def poll(self, timeout=None):
"""
Expand All @@ -139,9 +141,9 @@ def poll(self, timeout=None):
return message
if not message.error():
if message.value() is not None:
decoded_value = self._serializer.decode_message(message.value())
decoded_value = self._serializer.decode_message(message.value(), is_key=False)
message.set_value(decoded_value)
if message.key() is not None:
decoded_key = self._serializer.decode_message(message.key())
decoded_key = self._serializer.decode_message(message.key(), is_key=True)
message.set_key(decoded_key)
return message
35 changes: 24 additions & 11 deletions confluent_kafka/avro/serializer/message_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ class MessageSerializer(object):
All decode_* methods expect a buffer received from kafka.
"""

def __init__(self, registry_client):
def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=None):
self.registry_client = registry_client
self.id_to_decoder_func = {}
self.id_to_writers = {}
self.reader_key_schema = reader_key_schema
self.reader_value_schema = reader_value_schema

'''

Expand Down Expand Up @@ -151,33 +153,38 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False):
return outf.getvalue()

# Decoder support
def _get_decoder_func(self, schema_id, payload):
def _get_decoder_func(self, schema_id, payload, is_key=False):
if schema_id in self.id_to_decoder_func:
return self.id_to_decoder_func[schema_id]

# fetch from schema reg
# fetch writer schema from schema reg
try:
schema = self.registry_client.get_by_id(schema_id)
writer_schema_obj = self.registry_client.get_by_id(schema_id)
except ClientError as e:
raise SerializerError("unable to fetch schema with id %d: %s" % (schema_id, str(e)))

if schema is None:
if writer_schema_obj is None:
raise SerializerError("unable to fetch schema with id %d" % (schema_id))

curr_pos = payload.tell()

reader_schema_obj = self.reader_key_schema if is_key else self.reader_value_schema

if HAS_FAST:
# try to use fast avro
try:
schema_dict = schema.to_json()
schemaless_reader(payload, schema_dict)
writer_schema = writer_schema_obj.to_json()
reader_schema = reader_schema_obj.to_json()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm a bit late to the party here.

@fkaufer , correct me if I'm wrong, but my understanding is that self.reader_value_schema can be null, and so reader_schema_obj could be null as well. Wouldn't that line cause an NPE?

Also I wonder if the next line should not be schemaless_reader(payload, writer_schema, reader_schema) to be consistent with the code executed by the lambda this function returns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next line is a sanity check as alluded to in the comment below it. This is in place to catch edges which fastavro did not covet at the time the serializer was written. It's unclear if this is still actually an issue or not but it only happens upon instantiating the writer.

You won't get an NPE as the None type is an actual null but you will get an attribute error which would cause the serializer to fall back to slow avro as we call it. If you have already done so please feel free to open an issue which I will address tomorrow.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rnpridgeon , thanks this is helpful. I had already opened an issue for a related issue: #518 I think both can be addressed at the same time.

I actually tried to fix it my self, but couldn't get the test to run. I would need to spend more time trying to set up my development environment.

schemaless_reader(payload, writer_schema)

# If we reach this point, this means we have fastavro and it can
# do this deserialization. Rewind since this method just determines
# the reader function and we need to deserialize again along the
# normal path.
payload.seek(curr_pos)

self.id_to_decoder_func[schema_id] = lambda p: schemaless_reader(p, schema_dict)
self.id_to_decoder_func[schema_id] = lambda p: schemaless_reader(
p, writer_schema, reader_schema)
return self.id_to_decoder_func[schema_id]
except Exception:
# Fast avro failed, fall thru to standard avro below.
Expand All @@ -186,7 +193,13 @@ def _get_decoder_func(self, schema_id, payload):
# here means we should just delegate to slow avro
# rewind
payload.seek(curr_pos)
avro_reader = avro.io.DatumReader(schema)
# Avro DatumReader py2/py3 inconsistency, hence no param keywords
# should be revisited later
# https://github.com/apache/avro/blob/master/lang/py3/avro/io.py#L459
# https://github.com/apache/avro/blob/master/lang/py/src/avro/io.py#L423
# def __init__(self, writers_schema=None, readers_schema=None)
# def __init__(self, writer_schema=None, reader_schema=None)
avro_reader = avro.io.DatumReader(writer_schema_obj, reader_schema_obj)

def decoder(p):
bin_decoder = avro.io.BinaryDecoder(p)
Expand All @@ -195,7 +208,7 @@ def decoder(p):
self.id_to_decoder_func[schema_id] = decoder
return self.id_to_decoder_func[schema_id]

def decode_message(self, message):
def decode_message(self, message, is_key=False):
"""
Decode a message from kafka that has been encoded for use with
the schema registry.
Expand All @@ -212,5 +225,5 @@ def decode_message(self, message):
magic, schema_id = struct.unpack('>bI', payload.read(5))
if magic != MAGIC_BYTE:
raise SerializerError("message does not start with magic byte")
decoder_func = self._get_decoder_func(schema_id, payload)
decoder_func = self._get_decoder_func(schema_id, payload, is_key)
return decoder_func(payload)
114 changes: 112 additions & 2 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ def verify_avro():
for i, combo in enumerate(combinations):
combo['topic'] = str(uuid.uuid4())
p.produce(**combo)
p.poll(0)
p.flush()

# Create consumer
Expand Down Expand Up @@ -1202,9 +1201,117 @@ def verify_config(expconfig, configs):
print("Topic {} marked for deletion".format(our_topic))


# Exclude throttle since from default list
def verify_avro_explicit_read_schema():
""" verify that reading Avro with explicit reader schema works"""
from confluent_kafka import avro
avsc_dir = os.path.join(os.path.dirname(__file__), os.pardir, 'tests', 'avro')

# Producer config
conf = {'bootstrap.servers': bootstrap_servers,
'error_cb': error_cb,
'api.version.request': api_version_request,
'default.topic.config': {'produce.offset.report': True}}

# Create producer
if schema_registry_url:
conf['schema.registry.url'] = schema_registry_url
p = avro.AvroProducer(conf)
else:
p = avro.AvroProducer(conf, schema_registry=InMemorySchemaRegistry())

key_schema = avro.load(os.path.join(avsc_dir, "primitive_float.avsc"))
schema1 = avro.load(os.path.join(avsc_dir, "user_v1.avsc"))
schema2 = avro.load(os.path.join(avsc_dir, "user_v2.avsc"))
float_value = 32.
val = {
"name": "abc",
"favorite_number": 42,
"favorite_colo": "orange"
}
val1 = {
"name": "abc"
}

combinations = [
dict(value=val, value_schema=schema2, key=float_value, key_schema=key_schema,
reader_value_schema=schema1, reader_key_schema=key_schema),
dict(value=val1, value_schema=schema1, key=float_value, key_schema=key_schema,
reader_value_schema=schema2, reader_key_schema=key_schema),
]

# Consumer config
cons_conf = {'bootstrap.servers': bootstrap_servers,
'group.id': 'test.py',
'session.timeout.ms': 6000,
'enable.auto.commit': False,
'api.version.request': api_version_request,
'on_commit': print_commit_result,
'error_cb': error_cb,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}}

for i, combo in enumerate(combinations):
reader_key_schema = combo.pop("reader_key_schema")
reader_value_schema = combo.pop("reader_value_schema")
combo['topic'] = str(uuid.uuid4())
p.produce(**combo)
p.poll(0)
p.flush()

# Create consumer
conf = copy(cons_conf)
if schema_registry_url:
conf['schema.registry.url'] = schema_registry_url
c = avro.AvroConsumer(
conf,
reader_key_schema=reader_key_schema,
reader_value_schema=reader_value_schema)
else:
c = avro.AvroConsumer(
conf,
schema_registry=InMemorySchemaRegistry(),
reader_key_schema=reader_key_schema,
reader_value_schema=reader_value_schema)

c.subscribe([combo['topic']])

while True:
msg = c.poll(0)
if msg is None:
continue

if msg.error():
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
break
else:
continue

tstype, timestamp = msg.timestamp()
print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' %
(msg.topic(), msg.partition(), msg.offset(),
msg.key(), msg.value(), tstype, timestamp))

# omit empty Avro fields from payload for comparison
record_key = msg.key()
record_value = msg.value()
if isinstance(msg.key(), dict):
record_key = {k: v for k, v in msg.key().items() if v is not None}

if isinstance(msg.value(), dict):
record_value = {k: v for k, v in msg.value().items() if v is not None}

assert combo.get('key') == record_key
assert combo.get('value')['name'] == record_value['name']
c.commit(msg, asynchronous=False)
# Close consumer
c.close()
pass


default_modes = ['consumer', 'producer', 'avro', 'performance', 'admin']
all_modes = default_modes + ['throttle', 'avro-https', 'none']

"""All test modes"""


Expand Down Expand Up @@ -1315,6 +1422,9 @@ def resolve_envs(_conf):
print('=' * 30, 'Verifying AVRO', '=' * 30)
verify_avro()

print('=' * 30, 'Verifying AVRO with explicit reader schema', '=' * 30)
verify_avro_explicit_read_schema()

if 'avro-https' in modes:
print('=' * 30, 'Verifying AVRO with HTTPS', '=' * 30)
verify_avro_https()
Expand Down
8 changes: 8 additions & 0 deletions tests/avro/user_v1.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"type": "record",
"name": "UserKey",
"aliases": ["User"],
"fields": [
{"name": "name", "type": "string"}
]
}
10 changes: 10 additions & 0 deletions tests/avro/user_v2.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"type": "record",
"name": "User",
"aliases": ["UserKey"],
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["null", "int"], "default": null},
{"name": "favorite_color", "type": ["null", "string"], "default": null}
]
}