Skip to content

Don't send empty credentials to SR in Authorization Header #863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def __init__(self, conf):
raise ValueError("basic.auth.user.info must be in the form"
" of {username}:{password}")

self.session.auth = userinfo
self.session.auth = userinfo if userinfo != ('', '') else None

# Any leftover keys are unknown to _RestClient
if len(conf_copy) > 0:
Expand Down
5 changes: 3 additions & 2 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,9 @@ def example_list(a, args):
else:
errstr = ""

print(" partition {} leader: {}, replicas: {}, isrs: {}".format(
p.id, p.leader, p.replicas, p.isrs, errstr))
print("partition {} leader: {}, replicas: {},"
" isrs: {} errstr: {}".format(p.id, p.leader, p.replicas,
p.isrs, errstr))


if __name__ == '__main__':
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/producer/test_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def prefixed_delivery_cb(prefix):
def delivery_err(err, msg):
""" Reports failed message delivery to aid in troubleshooting test failures. """
if err:
print("[{}]: Message delivery failed (%s [%s]): %s".format(prefix,
(msg.topic(), str(msg.partition()), err)))
print("[{}]: Message delivery failed ({} [{}]): {}".format(
prefix, msg.topic(), str(msg.partition()), err))
return

return delivery_err
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/schema_registry/test_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,6 @@ def test_api_config_update(kafka_cluster):
"""
sr = kafka_cluster.schema_registry()

for l in ["BACKWARD", "BACKWARD_TRANSITIVE", "FORWARD", "FORWARD_TRANSITIVE"]:
sr.set_compatibility(level=l)
assert sr.get_compatibility()['compatibilityLevel'] == l
for level in ["BACKWARD", "BACKWARD_TRANSITIVE", "FORWARD", "FORWARD_TRANSITIVE"]:
sr.set_compatibility(level=level)
assert sr.get_compatibility()['compatibilityLevel'] == level
26 changes: 26 additions & 0 deletions tests/schema_registry/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
#
import os
import re
from base64 import b64decode
from collections import defaultdict

import pytest
import requests_mock
from requests_mock import create_response

from confluent_kafka.schema_registry.schema_registry_client import \
SchemaRegistryClient
Expand Down Expand Up @@ -68,6 +70,8 @@ class MockSchemaRegistryClient(SchemaRegistryClient):
the only endpoint which supports this is /config which will return an
`Invalid compatibility level` error.

To coerce Authentication errors configure credentials to
not match MockSchemaRegistryClient.USERINFO.

Request paths to trigger exceptions:
+--------+----------------------------------+-------+------------------------------+
Expand Down Expand Up @@ -130,6 +134,7 @@ class MockSchemaRegistryClient(SchemaRegistryClient):
VERSIONS = [1, 2, 3, 4]
SCHEMA = 'basic_schema.avsc'
SUBJECTS = ['subject1', 'subject2']
USERINFO = 'mock_user:mock_password'

# Counts requests handled per path by HTTP method
# {HTTP method: { path : count}}
Expand Down Expand Up @@ -164,8 +169,29 @@ def __init__(self, conf):
adapter.register_uri('POST', self.subject_versions,
json=self.post_subject_version_callback)

adapter.add_matcher(self._auth_matcher)
self._rest_client.session.mount('http://', adapter)

@classmethod
def _auth_matcher(cls, request):
headers = request._request.headers

authinfo = headers.get('Authorization', None)
# Pass request to downstream matchers
if authinfo is None:
return None

# We only support the BASIC scheme today
scheme, userinfo = authinfo.split(" ")
if b64decode(userinfo) == cls.USERINFO:
return None

unauthorized = {'error_code': 401,
'message': "401 Unauthorized"}
return create_response(request=request,
status_code=401,
json=unauthorized)

@staticmethod
def _load_avsc(name):
with open(os.path.join(work_dir, '..', 'integration', 'schema_registry',
Expand Down
19 changes: 19 additions & 0 deletions tests/schema_registry/test_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,25 @@ def cmp_schema(schema1, schema2):
schema1.schema_type == schema2.schema_type])


def test_basic_auth_unauthorized(mock_schema_registry, load_avsc):
conf = {'url': TEST_URL,
'basic.auth.user.info': "user:secret"}
sr = mock_schema_registry(conf)

with pytest.raises(SchemaRegistryError, match="401 Unauthorized"):
sr.get_subjects()


def test_basic_auth_authorized(mock_schema_registry, load_avsc):
conf = {'url': TEST_URL,
'basic.auth.user.info': mock_schema_registry.USERINFO}
sr = mock_schema_registry(conf)

result = sr.get_subjects()

assert result == mock_schema_registry.SUBJECTS


def test_register_schema(mock_schema_registry, load_avsc):
conf = {'url': TEST_URL}
sr = mock_schema_registry(conf)
Expand Down
4 changes: 2 additions & 2 deletions tests/schema_registry/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ def test_config_auth_userinfo():
'basic.auth.user.info': TEST_USERNAME + ':' + TEST_USER_PASSWORD}

test_client = SchemaRegistryClient(conf)
assert test_client._rest_client.session.auth == [TEST_USERNAME,
TEST_USER_PASSWORD]
assert test_client._rest_client.session.auth == (TEST_USERNAME,
TEST_USER_PASSWORD)


def test_config_auth_userinfo_invalid():
Expand Down
7 changes: 4 additions & 3 deletions tools/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,13 @@ Create a new virtualenv:

Install the relevant package for your platform:

$ pip install dl-v0.11.4rc1/confluent_kafka-....whl
$ pip install --no-cache-dir --find-links dl-v0.11.4rc1/ confluent-kafka

Verify that the package works, should print the expected Python client
and librdkafka versions:

$ python -c 'import confluent_kafka as ck ; print "py:", ck.version(), "c:", ck.libversion()'
$ python -c 'import confluent_kafka as ck ; print "py: {} c: {}" \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not work for python3

.format(ck.version(), ck.libversion())'
py: ('0.11.4', 721920) c: ('0.11.4-RC1', 722121)

## 10. Open a release PR
Expand Down Expand Up @@ -203,7 +204,7 @@ In the same virtualenv as created above:


Verify that the package works and prints the expected version:
$ python -c 'import confluent_kafka as ck ; print "py:", ck.version(), "c:", ck.libversion()'
$ python -c 'import confluent_kafka as ck ; print("py:", ck.version(), "c:", ck.libversion())'
py: ('0.11.4', 721920) c: ('0.11.4-RC1', 722121)


Expand Down