Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/kafka/README.rst
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.. autoclass:: testcontainers.kafka.KafkaContainer
.. title:: testcontainers.kafka.KafkaContainer
.. autoclass:: testcontainers.kafka.RedpandaContainer
6 changes: 6 additions & 0 deletions modules/kafka/testcontainers/kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
from testcontainers.core.container import DockerContainer
from testcontainers.core.utils import raise_for_deprecated_parameter
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.kafka._redpanda import RedpandaContainer

__all__ = [
"KafkaContainer",
"RedpandaContainer",
]


class KafkaContainer(DockerContainer):
Expand Down
82 changes: 82 additions & 0 deletions modules/kafka/testcontainers/kafka/_redpanda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import tarfile
import time
from io import BytesIO
from textwrap import dedent

from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs


class RedpandaContainer(DockerContainer):
"""
Redpanda container.

Example:

.. doctest::

>>> from testcontainers.kafka import RedpandaContainer

>>> with RedpandaContainer() as redpanda:
... connection = redpanda.get_bootstrap_server()
"""

TC_START_SCRIPT = "/tc-start.sh"

def __init__(
self,
image: str = "docker.redpanda.com/redpandadata/redpanda:v23.1.13",
**kwargs,
) -> None:
kwargs["entrypoint"] = "sh"
super().__init__(image, **kwargs)
self.redpanda_port = 9092
self.schema_registry_port = 8081
self.with_exposed_ports(self.redpanda_port, self.schema_registry_port)

def get_bootstrap_server(self) -> str:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.redpanda_port)
return f"{host}:{port}"

def get_schema_registry_address(self) -> str:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.schema_registry_port)
return f"http://{host}:{port}"

def tc_start(self) -> None:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.redpanda_port)

data = (
dedent(
f"""
#!/bin/bash
/usr/bin/rpk redpanda start --mode dev-container --smp 1 --memory 1G \
--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 \
--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://{host}:{port}
"""
)
.strip()
.encode("utf-8")
)

self.create_file(data, RedpandaContainer.TC_START_SCRIPT)

def start(self, timeout=10) -> "RedpandaContainer":
script = RedpandaContainer.TC_START_SCRIPT
command = f'-c "while [ ! -f {script} ]; do sleep 0.1; done; sh {script}"'
self.with_command(command)
super().start()
self.tc_start()
wait_for_logs(self, r".*Started Kafka API server.*", timeout=timeout)
return self

def create_file(self, content: bytes, path: str) -> None:
with BytesIO() as archive, tarfile.TarFile(fileobj=archive, mode="w") as tar:
tarinfo = tarfile.TarInfo(name=path)
tarinfo.size = len(content)
tarinfo.mtime = time.time()
tar.addfile(tarinfo, BytesIO(content))
archive.seek(0)
self.get_wrapped_container().put_archive("/", archive)
54 changes: 54 additions & 0 deletions modules/kafka/tests/test_redpanda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pytest
from requests import post, get
from json import dumps

from kafka import KafkaConsumer, KafkaProducer, TopicPartition, KafkaAdminClient
from kafka.admin import NewTopic

from testcontainers.kafka import RedpandaContainer


def test_redpanda_producer_consumer():
with RedpandaContainer() as container:
produce_and_consume_message(container)


@pytest.mark.parametrize("version", ["v23.1.13", "v23.3.10"])
def test_redpanda_confluent_version(version):
with RedpandaContainer(image=f"docker.redpanda.com/redpandadata/redpanda:{version}") as container:
produce_and_consume_message(container)


def test_schema_registry():
with RedpandaContainer() as container:
address = container.get_schema_registry_address()
subject_name = "test-subject-value"
url = f"{address}/subjects"

payload = {"schema": dumps({"type": "string"})}
headers = {"Content-Type": "application/vnd.schemaregistry.v1+json"}
create_result = post(f"{url}/{subject_name}/versions", data=dumps(payload), headers=headers)
assert create_result.status_code == 200

result = get(url)
assert result.status_code == 200
assert subject_name in result.json()


def produce_and_consume_message(container):
topic = "test-topic"
bootstrap_server = container.get_bootstrap_server()

admin = KafkaAdminClient(bootstrap_servers=[bootstrap_server])
admin.create_topics([NewTopic(topic, 1, 1)])

producer = KafkaProducer(bootstrap_servers=[bootstrap_server])
future = producer.send(topic, b"verification message")
future.get(timeout=10)
producer.close()

consumer = KafkaConsumer(bootstrap_servers=[bootstrap_server])
tp = TopicPartition(topic, 0)
consumer.assign([tp])
consumer.seek_to_beginning()
assert consumer.end_offsets([tp])[tp] == 1, "Expected exactly one test message to be present on test topic !"
16 changes: 10 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ psycopg2-binary = "*"
pg8000 = "*"
sqlalchemy = "*"
psycopg = "*"
kafka-python = "^2.0.2"
cassandra-driver = "*"
pytest-asyncio = "0.23.5"
kafka-python-ng = "^2.2.0"

[[tool.poetry.source]]
name = "PyPI"
Expand Down