Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ dl-*
*.whl
.pytest_cache
staging
tests/docker/conf/tls/*
80 changes: 44 additions & 36 deletions confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,23 @@ class AvroProducer(Producer):
def __init__(self, config, default_key_schema=None,
default_value_schema=None, schema_registry=None):

schema_registry_url = config.pop("schema.registry.url", None)
schema_registry_ca_location = config.pop("schema.registry.ssl.ca.location", None)
schema_registry_certificate_location = config.pop("schema.registry.ssl.certificate.location", None)
schema_registry_key_location = config.pop("schema.registry.ssl.key.location", None)
sr_conf = {key.replace("schema.registry.", ""): value
for key, value in config.items() if key.startswith("schema.registry")}

if sr_conf.get("basic.auth.credentials.source") == 'SASL_INHERIT':
sr_conf['sasl.mechanisms'] = config.get('sasl.mechanisms', '')
sr_conf['sasl.username'] = config.get('sasl.username', '')
sr_conf['sasl.password'] = config.get('sasl.password', '')

ap_conf = {key: value
for key, value in config.items() if not key.startswith("schema.registry")}

if schema_registry is None:
if schema_registry_url is None:
raise ValueError("Missing parameter: schema.registry.url")

schema_registry = CachedSchemaRegistryClient(url=schema_registry_url,
ca_location=schema_registry_ca_location,
cert_location=schema_registry_certificate_location,
key_location=schema_registry_key_location)
elif schema_registry_url is not None:
schema_registry = CachedSchemaRegistryClient(sr_conf)
elif sr_conf.get("url", None) is not None:
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")

super(AvroProducer, self).__init__(config)
super(AvroProducer, self).__init__(ap_conf)
self._serializer = MessageSerializer(schema_registry)
self._key_schema = default_key_schema
self._value_schema = default_value_schema
Expand Down Expand Up @@ -99,28 +99,30 @@ class AvroConsumer(Consumer):

:param dict config: Config parameters containing url for schema registry (``schema.registry.url``)
and the standard Kafka client configuration (``bootstrap.servers`` et.al)
:param optional a reader schema for the message key
:param optional a reader schema for the message value
:param schema reader_key_schema: a reader schema for the message key
:param schema reader_value_schema: a reader schema for the message value
:raises ValueError: For invalid configurations
"""

def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None):

schema_registry_url = config.pop("schema.registry.url", None)
schema_registry_ca_location = config.pop("schema.registry.ssl.ca.location", None)
schema_registry_certificate_location = config.pop("schema.registry.ssl.certificate.location", None)
schema_registry_key_location = config.pop("schema.registry.ssl.key.location", None)
sr_conf = {key.replace("schema.registry.", ""): value
for key, value in config.items() if key.startswith("schema.registry")}

if sr_conf.get("basic.auth.credentials.source") == 'SASL_INHERIT':
sr_conf['sasl.mechanisms'] = config.get('sasl.mechanisms', '')
sr_conf['sasl.username'] = config.get('sasl.username', '')
sr_conf['sasl.password'] = config.get('sasl.password', '')

ap_conf = {key: value
for key, value in config.items() if not key.startswith("schema.registry")}

if schema_registry is None:
if schema_registry_url is None:
raise ValueError("Missing parameter: schema.registry.url")

schema_registry = CachedSchemaRegistryClient(url=schema_registry_url,
ca_location=schema_registry_ca_location,
cert_location=schema_registry_certificate_location,
key_location=schema_registry_key_location)
elif schema_registry_url is not None:
schema_registry = CachedSchemaRegistryClient(sr_conf)
elif sr_conf.get("url", None) is not None:
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")

super(AvroConsumer, self).__init__(config)
super(AvroConsumer, self).__init__(ap_conf)
self._serializer = MessageSerializer(schema_registry, reader_key_schema, reader_value_schema)

def poll(self, timeout=None):
Expand All @@ -137,13 +139,19 @@ def poll(self, timeout=None):
message = super(AvroConsumer, self).poll(timeout)
if message is None:
return None
if not message.value() and not message.key():
return message

if not message.error():
if message.value() is not None:
decoded_value = self._serializer.decode_message(message.value(), is_key=False)
message.set_value(decoded_value)
if message.key() is not None:
decoded_key = self._serializer.decode_message(message.key(), is_key=True)
message.set_key(decoded_key)
try:
if message.value() is not None:
decoded_value = self._serializer.decode_message(message.value(), is_key=False)
message.set_value(decoded_value)
if message.key() is not None:
decoded_key = self._serializer.decode_message(message.key(), is_key=True)
message.set_key(decoded_key)
except SerializerError as e:
raise SerializerError("Message deserialization failed for message at {} [{}] offset {}: {}".format(
message.topic(),
message.partition(),
message.offset(),
e))
return message
Loading