The messages are not cleared from topic once consumed. #61
Closed
Description
Describe the bug
Created a mock producer and consumer as shown in the snippet. When a unit test is run, code uses mock producer to produce multiple messages to 2 topics. Testcase checks if the produced messages are received, via polling the mock consumer.
When multiple messages are produced on > 1 topic, the behavior is inconsistent.
- The messages are not cleared from topic once consumed.
- All messages are not received on the expected topic.
Even with a single topic is used and only produce multiple messages on that topic-
- The messages are not cleared from topic once consumed.
To Reproduce
- replace constants in files below. Run with TestCase to produce multiple messages on same topic, check consumed messages.
- replace constants in files below. Run with TestCase to produce multiple messages on different topic, check consumed messages.
previously consumed messages are still shown.
import json
import uuid
from random import randint
from mockafka import FakeProducer, setup_kafka, FakeAdminClientImpl
from mockafka.admin_client import NewTopic
class MockProducer:
def __init__(self):
admin = FakeAdminClientImpl()
admin.create_topics([
NewTopic(topic=KAFKA_TOPIC_AUDITS, num_partitions=2),
NewTopic(topic=KAFKA_TOPIC_RAW_DOCS, num_partitions=2)
])
self._producer = FakeProducer()
def produce(self, topic: str, key: str, message: dict):
try:
logger.info(f'sent msg to {topic}, key: {key}, {json.dumps(message, indent=4)}')
self._producer.produce(
key=key + str(uuid.uuid4()),
value=message,
topic=topic,
partition=randint(0, 1)
)
except Exception as e:
logger.error(f"kafka produce error:{e} for topic:{topic}, key:{key}, message:{message}")
from typing import Any
from mockafka import FakeConsumer
class MockConsumer:
def __init__(self):
self._consumer = FakeConsumer()
def consume(self, topics, msg_cnt=1) -> Any:
self._consumer.subscribe(topics)
messages = []
cnt = 1
while cnt <= msg_cnt:
message = self._consumer.poll(timeout=1.0)
print("mock consumer received:", message.value())
if message is not None:
messages.append(message.value())
cnt = cnt + 1
self._consumer.commit(message)
# self._consumer.unsubscribe(topics)
return messages
def close(self):
self._consumer.close()
Expected behavior
The messages should be removed from topic once consumed.
Screenshots
If applicable, add screenshots to help explain your problem.
Desktop (please complete the following information):
- OS: Mac
- python ver- 3.11
- lib version mockafka-py==0.1.53
Additional context