Skip to content

The messages are not cleared from topic once consumed.  #61

Closed
@ssmails

Description

Describe the bug
Created a mock producer and consumer as shown in the snippet. When a unit test is run, code uses mock producer to produce multiple messages to 2 topics. Testcase checks if the produced messages are received, via polling the mock consumer.
When multiple messages are produced on > 1 topic, the behavior is inconsistent.

  • The messages are not cleared from topic once consumed.
  • All messages are not received on the expected topic.

Even with a single topic is used and only produce multiple messages on that topic-

  • The messages are not cleared from topic once consumed.

To Reproduce

  • replace constants in files below. Run with TestCase to produce multiple messages on same topic, check consumed messages.
  • replace constants in files below. Run with TestCase to produce multiple messages on different topic, check consumed messages.

previously consumed messages are still shown.

import json
import uuid
from random import randint

from mockafka import FakeProducer, setup_kafka, FakeAdminClientImpl
from mockafka.admin_client import NewTopic

class MockProducer:

    def __init__(self):
        admin = FakeAdminClientImpl()
        admin.create_topics([
            NewTopic(topic=KAFKA_TOPIC_AUDITS, num_partitions=2),
            NewTopic(topic=KAFKA_TOPIC_RAW_DOCS, num_partitions=2)
        ])
        self._producer = FakeProducer()

    def produce(self, topic: str, key: str, message: dict):
        try:
            logger.info(f'sent msg to {topic}, key: {key}, {json.dumps(message, indent=4)}')
            self._producer.produce(
                key=key + str(uuid.uuid4()),
                value=message,
                topic=topic,
                partition=randint(0, 1)
            )
        except Exception as e:
            logger.error(f"kafka produce error:{e} for topic:{topic}, key:{key}, message:{message}")
from typing import Any
from mockafka import FakeConsumer

class MockConsumer:
    def __init__(self):
        self._consumer = FakeConsumer()

    def consume(self, topics, msg_cnt=1) -> Any:
        self._consumer.subscribe(topics)
        messages = []
        cnt = 1
        while cnt <= msg_cnt:
            message = self._consumer.poll(timeout=1.0)
            print("mock consumer received:", message.value())
            if message is not None:
                messages.append(message.value())
                cnt = cnt + 1
            self._consumer.commit(message)

        # self._consumer.unsubscribe(topics)
        return messages

    def close(self):
        self._consumer.close()

Expected behavior
The messages should be removed from topic once consumed.

Screenshots
If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

  • OS: Mac
  • python ver- 3.11
  • lib version mockafka-py==0.1.53

Additional context

Metadata

Assignees

Labels

bugSomething isn't working

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions