Skip to content

Add support for HTTP Basic Auth #440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Dec 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ dl-*
*.whl
.pytest_cache
staging
tests/docker/conf/tls/*
80 changes: 44 additions & 36 deletions confluent_kafka/avro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,23 @@ class AvroProducer(Producer):
def __init__(self, config, default_key_schema=None,
default_value_schema=None, schema_registry=None):

schema_registry_url = config.pop("schema.registry.url", None)
schema_registry_ca_location = config.pop("schema.registry.ssl.ca.location", None)
schema_registry_certificate_location = config.pop("schema.registry.ssl.certificate.location", None)
schema_registry_key_location = config.pop("schema.registry.ssl.key.location", None)
sr_conf = {key.replace("schema.registry.", ""): value
for key, value in config.items() if key.startswith("schema.registry")}

if sr_conf.get("basic.auth.credentials.source") == 'SASL_INHERIT':
sr_conf['sasl.mechanisms'] = config.get('sasl.mechanisms', '')
sr_conf['sasl.username'] = config.get('sasl.username', '')
sr_conf['sasl.password'] = config.get('sasl.password', '')

ap_conf = {key: value
for key, value in config.items() if not key.startswith("schema.registry")}

if schema_registry is None:
if schema_registry_url is None:
raise ValueError("Missing parameter: schema.registry.url")

schema_registry = CachedSchemaRegistryClient(url=schema_registry_url,
ca_location=schema_registry_ca_location,
cert_location=schema_registry_certificate_location,
key_location=schema_registry_key_location)
elif schema_registry_url is not None:
schema_registry = CachedSchemaRegistryClient(sr_conf)
elif sr_conf.get("url", None) is not None:
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")

super(AvroProducer, self).__init__(config)
super(AvroProducer, self).__init__(ap_conf)
self._serializer = MessageSerializer(schema_registry)
self._key_schema = default_key_schema
self._value_schema = default_value_schema
Expand Down Expand Up @@ -99,28 +99,30 @@ class AvroConsumer(Consumer):

:param dict config: Config parameters containing url for schema registry (``schema.registry.url``)
and the standard Kafka client configuration (``bootstrap.servers`` et.al)
:param optional a reader schema for the message key
:param optional a reader schema for the message value
:param schema reader_key_schema: a reader schema for the message key
:param schema reader_value_schema: a reader schema for the message value
:raises ValueError: For invalid configurations
"""

def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None):

schema_registry_url = config.pop("schema.registry.url", None)
schema_registry_ca_location = config.pop("schema.registry.ssl.ca.location", None)
schema_registry_certificate_location = config.pop("schema.registry.ssl.certificate.location", None)
schema_registry_key_location = config.pop("schema.registry.ssl.key.location", None)
sr_conf = {key.replace("schema.registry.", ""): value
for key, value in config.items() if key.startswith("schema.registry")}

if sr_conf.get("basic.auth.credentials.source") == 'SASL_INHERIT':
sr_conf['sasl.mechanisms'] = config.get('sasl.mechanisms', '')
sr_conf['sasl.username'] = config.get('sasl.username', '')
sr_conf['sasl.password'] = config.get('sasl.password', '')

ap_conf = {key: value
for key, value in config.items() if not key.startswith("schema.registry")}

if schema_registry is None:
if schema_registry_url is None:
raise ValueError("Missing parameter: schema.registry.url")

schema_registry = CachedSchemaRegistryClient(url=schema_registry_url,
ca_location=schema_registry_ca_location,
cert_location=schema_registry_certificate_location,
key_location=schema_registry_key_location)
elif schema_registry_url is not None:
schema_registry = CachedSchemaRegistryClient(sr_conf)
elif sr_conf.get("url", None) is not None:
raise ValueError("Cannot pass schema_registry along with schema.registry.url config")

super(AvroConsumer, self).__init__(config)
super(AvroConsumer, self).__init__(ap_conf)
self._serializer = MessageSerializer(schema_registry, reader_key_schema, reader_value_schema)

def poll(self, timeout=None):
Expand All @@ -137,13 +139,19 @@ def poll(self, timeout=None):
message = super(AvroConsumer, self).poll(timeout)
if message is None:
return None
if not message.value() and not message.key():
return message

if not message.error():
if message.value() is not None:
decoded_value = self._serializer.decode_message(message.value(), is_key=False)
message.set_value(decoded_value)
if message.key() is not None:
decoded_key = self._serializer.decode_message(message.key(), is_key=True)
message.set_key(decoded_key)
try:
if message.value() is not None:
decoded_value = self._serializer.decode_message(message.value(), is_key=False)
message.set_value(decoded_value)
if message.key() is not None:
decoded_key = self._serializer.decode_message(message.key(), is_key=True)
message.set_key(decoded_key)
except SerializerError as e:
raise SerializerError("Message deserialization failed for message at {} [{}] offset {}: {}".format(
message.topic(),
message.partition(),
message.offset(),
e))
return message
Loading