Skip to content

Commit

Permalink
Fix implementation of repeated single-message fetching
Browse files Browse the repository at this point in the history
This brings the mock implementations in line with the behaviour
observed in the default configurations of `confluent-kafka` and
`aiokafka` respectively, both of which return subsequent messages
on subsequent calls.
  • Loading branch information
PeterJCLaw committed Jul 6, 2024
1 parent 2aea1e2 commit 581629d
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 22 deletions.
19 changes: 9 additions & 10 deletions mockafka/aiokafka/aiokafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,23 @@ async def getone(self):
next_offset = self.kafka.get_partition_next_offset(
topic=topic, partition=partition
)
consumer_amount = self.consumer_store.get(
self._get_key(topic, partition)
)
if first_offset == next_offset:
# Topic is empty
continue

topic_key = self._get_key(topic, partition)

consumer_amount = self.consumer_store.setdefault(
topic_key, first_offset
)
if consumer_amount == next_offset:
# Topic is exhausted
continue

if consumer_amount is not None:
self.consumer_store[self._get_key(topic, partition)] += 1
else:
self.consumer_store[self._get_key(topic, partition)] = (
first_offset + 1
)
self.consumer_store[topic_key] += 1

return self.kafka.get_message(
topic=topic, partition=partition, offset=first_offset
topic=topic, partition=partition, offset=consumer_amount
)

return None
Expand Down
19 changes: 9 additions & 10 deletions mockafka/conumser.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,24 +159,23 @@ def poll(self, timeout=None):
next_offset = self.kafka.get_partition_next_offset(
topic=topic, partition=partition
)
consumer_amount = self.consumer_store.get(
self._get_key(topic, partition)
)
if first_offset == next_offset:
# Topic is empty
continue

topic_key = self._get_key(topic, partition)

consumer_amount = self.consumer_store.setdefault(
topic_key, first_offset
)
if consumer_amount == next_offset:
# Topic is exhausted
continue

if consumer_amount is not None:
self.consumer_store[self._get_key(topic, partition)] += 1
else:
self.consumer_store[self._get_key(topic, partition)] = (
first_offset + 1
)
self.consumer_store[topic_key] += 1

return self.kafka.get_message(
topic=topic, partition=partition, offset=first_offset
topic=topic, partition=partition, offset=consumer_amount
)

return None
Expand Down
2 changes: 1 addition & 1 deletion tests/test_aiokafka/test_aiokafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def test_poll_without_commit(self):
message = await self.consumer.getone()
self.assertEqual(message.value(payload=None), "test")
message = await self.consumer.getone()
self.assertEqual(message.value(payload=None), "test")
self.assertEqual(message.value(payload=None), "test1")

self.assertIsNone(await self.consumer.getone())
self.assertIsNone(await self.consumer.getone())
Expand Down
2 changes: 1 addition & 1 deletion tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_poll_without_commit(self):
message = self.consumer.poll()
self.assertEqual(message.value(payload=None), "test")
message = self.consumer.poll()
self.assertEqual(message.value(payload=None), "test")
self.assertEqual(message.value(payload=None), "test1")

self.assertIsNone(self.consumer.poll())
self.assertIsNone(self.consumer.poll())
Expand Down

0 comments on commit 581629d

Please sign in to comment.