Skip to content

Enable SCRAM-SHA-256 and SCRAM-SHA-512 for sasl #1918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
Dec 29, 2019
Merged
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
ce61dfa
add missing dev requirement 'mock'
Oct 1, 2019
b565067
extend KafkaFixture to support scram and plain sasl mechanisms
Oct 1, 2019
fd8f11c
write test for PLAIN sasl mechanism
Oct 1, 2019
67d5276
normalize KafkaFixture.get_clients .get_consumers and .get_producers
Oct 2, 2019
471029b
refactor KafkaFixture.get_clients .get_consumers and .get_producers
Oct 2, 2019
f0b0389
add get_topic_names to KafkaFixture
Oct 2, 2019
d6761bd
fix bug in KafkaFixture.create_topics
Oct 2, 2019
6077c6c
create function to turn special chars to underscore
Oct 2, 2019
37d100d
improve KafkaFixture.get_consumers
Oct 2, 2019
d297767
improve sasl tests and enable tests for scram
Oct 2, 2019
a97386d
implement scram
Oct 2, 2019
3c3c5a0
make sure to wait for scram credentials during kafka startup
Oct 2, 2019
9785c05
ensure compatibility with older python versions
Oct 3, 2019
0570d91
enable logging for tests
Oct 15, 2019
d7fe59b
add more output to SpawnedService
Oct 15, 2019
22b6190
add more checks if SpawnedService is actually running
Oct 15, 2019
84b57ed
switch to actual jaas config file for kafka
Oct 15, 2019
01f48d7
remove sasl mechanism settings from server.properties for 1.1.1
Oct 15, 2019
3f399d3
change scram_pattern in KafkaFixture
Oct 15, 2019
fecdc46
minor refactoring of least_loaded_node
Oct 15, 2019
9a99f95
update docstrings with information about new sasl mechanisms
Oct 15, 2019
60702c6
minor refactoring of test_sasl_integration.py
Oct 15, 2019
b61d68f
trying to make it run in travis
Oct 16, 2019
e641521
go back to raising runtime error in wait_for
Oct 16, 2019
720daa3
reduce kafka startup timeout and log config file content
Oct 16, 2019
de6a360
reduce tests to get quicker feedback
Oct 16, 2019
1f98ee4
output config file contents earlier
Oct 16, 2019
fa1ca01
further investiagte settings
Oct 16, 2019
e41c0be
try to get rid of emtpy log lines
Oct 16, 2019
7af9f97
move investigation to rendering function
Oct 16, 2019
0ef668e
stop on first failed test
Oct 16, 2019
65bb4fa
use strpath in render_template
Oct 16, 2019
082bb4e
check kafka.properties files for the `sasl` keyword
Oct 16, 2019
b1d1ee0
add another commit to kafka.properties to make sure changes are not o…
Oct 16, 2019
a832ae5
Revert "use strpath in render_template"
Oct 16, 2019
97778fc
limit cache to servers/dist
Oct 16, 2019
77ec1b3
set offsets.topic.replication.factor to 1 in all kafka.properties
Oct 16, 2019
8aca0ee
dump logs on sasl_kafka tear down
Oct 16, 2019
d5e9a0f
try shorter topic name for sasl test
Oct 17, 2019
a3aadf3
enable debug mode for kafka
Oct 17, 2019
c0b0437
refactor KafkaFixture._create_topics and include retry logic
Oct 17, 2019
ca3c304
fix bug in KafkaFixture._send_request
Oct 17, 2019
e780de8
another minor bugfix
Oct 17, 2019
1fe4c17
revert changes introduced for debugging
Oct 17, 2019
bce9e41
minor bugfix
Oct 17, 2019
54922a0
increase Kafka startup timeout again
Oct 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improve sasl tests and enable tests for scram
  • Loading branch information
Swen Wenzel committed Oct 16, 2019
commit d297767c7286e49d0b29de5680dd4d033debabb0
123 changes: 66 additions & 57 deletions test/test_sasl_integration.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,79 @@
import logging
import uuid

import pytest
from . import unittest
from .testutil import random_string

from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import env_kafka_version
# from kafka.client import SimpleClient, KafkaClient
# from kafka.producer import KafkaProducer, SimpleProducer
# from kafka.consumer import SimpleConsumer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.admin import NewTopic
from kafka.client import KafkaClient
from kafka.protocol.metadata import MetadataRequest_v1
from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore


class SASLIntegrationTestCase(unittest.TestCase):
sasl_mechanism = None
sasl_transport = "SASL_PLAINTEXT"
server = None
zk = None
@pytest.fixture(
params=[
pytest.param(
"PLAIN", marks=pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Requires KAFKA_VERSION >= 0.10")
),
pytest.param(
"SCRAM-SHA-256",
marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"),
),
pytest.param(
"SCRAM-SHA-512",
marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"),
),
]
)
def sasl_kafka(request, kafka_broker_factory):
return kafka_broker_factory(transport="SASL_PLAINTEXT", sasl_mechanism=request.param)[0]

@classmethod
def setUpClass(cls) -> None:
cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(
0, cls.zk, zk_chroot=random_string(10), transport=cls.sasl_transport, sasl_mechanism=cls.sasl_mechanism
)

@classmethod
def tearDownClass(cls) -> None:
pass
def test_admin(request, sasl_kafka):
topic_name = special_to_underscore(request.node.name + random_string(4))
admin, = sasl_kafka.get_admin_clients(1)
admin.create_topics([NewTopic(topic_name, 1, 1)])
assert topic_name in sasl_kafka.get_topic_names()

@classmethod
def bootstrap_servers(cls) -> str:
return "{}:{}".format(cls.server.host, cls.server.port)

def test_admin(self):
admin = self.create_admin()
admin.create_topics([NewTopic('mytopic', 1, 1)])
def test_produce_and_consume(request, sasl_kafka):
topic_name = special_to_underscore(request.node.name + random_string(4))
sasl_kafka.create_topics([topic_name], num_partitions=2)
producer, = sasl_kafka.get_producers(1)

def create_admin(self) -> KafkaAdminClient:
raise NotImplementedError()
messages_and_futures = [] # [(message, produce_future),]
for i in range(100):
encoded_msg = "{}-{}-{}".format(i, request.node.name, uuid.uuid4()).encode("utf-8")
future = producer.send(topic_name, value=encoded_msg, partition=i % 2)
messages_and_futures.append((encoded_msg, future))
producer.flush()

for (msg, f) in messages_and_futures:
assert f.succeeded()

consumer, = sasl_kafka.get_consumers(1, [topic_name])
messages = {0: [], 1: []}
for i, message in enumerate(consumer, 1):
logging.debug("Consumed message %s", repr(message))
messages[message.partition].append(message)
if i >= 100:
break

assert_message_count(messages[0], 50)
assert_message_count(messages[1], 50)

@pytest.mark.skipif(
not env_kafka_version() or env_kafka_version() < (0, 10), reason="No KAFKA_VERSION or version too low"
)
class TestSaslPlain(SASLIntegrationTestCase):
sasl_mechanism = "PLAIN"

def create_admin(self) -> KafkaAdminClient:
return KafkaAdminClient(
bootstrap_servers=self.bootstrap_servers(),
security_protocol=self.sasl_transport,
sasl_mechanism=self.sasl_mechanism,
sasl_plain_username=self.server.broker_user,
sasl_plain_password=self.server.broker_password
)
def test_client(request, sasl_kafka):
topic_name = special_to_underscore(request.node.name + random_string(4))
sasl_kafka.create_topics([topic_name], num_partitions=1)

#
# @pytest.mark.skipif(
# not env_kafka_version() or env_kafka_version() < (0, 10, 2), reason="No KAFKA_VERSION or version too low"
# )
# class TestSaslScram256(SASLIntegrationTestCase):
# sasl_mechanism = "SCRAM-SHA-256"
#
#
#
# @pytest.mark.skipif(
# not env_kafka_version() or env_kafka_version() < (0, 10, 2), reason="No KAFKA_VERSION or version too low"
# )
# class TestSaslScram512(SASLIntegrationTestCase):
# sasl_mechanism = "SCRAM-SHA-512"
client: KafkaClient = next(sasl_kafka.get_clients(1), None)
request = MetadataRequest_v1(None)
client.send(0, request)
for _ in range(10):
result = client.poll(timeout_ms=10_000)
if len(result) > 0:
break
else:
raise TimeoutError("Couldn't fetch topic response from Broker.")
result = result[0]
assert topic_name in [t[1] for t in result.topics]