Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Kafka offset checking test #1212

Merged
Prev Previous commit
Next Next commit
Revert "Declare OffsetChecker as a MRC component so that it executes …
…prior to the source's yield returns."

This reverts commit 948f4ef.
  • Loading branch information
dagardner-nv committed Sep 21, 2023
commit b3d73ae8d443094fb50c38eba45dc305feb33ccd
14 changes: 6 additions & 8 deletions tests/test_kafka_source_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,13 @@ def _offset_checker(self, x):
new_offsets = self._client.list_consumer_group_offsets(self._group_id)

if self._offsets is not None:
at_least_one_gt = len(new_offsets) > len(self._offsets)
if not at_least_one_gt:
for (topic_partition, prev_offset) in self._offsets.items():
new_offset = new_offsets[topic_partition]
for (topic_partition, prev_offset) in self._offsets.items():
new_offset = new_offsets[topic_partition]

assert new_offset.offset >= prev_offset.offset
assert new_offset.offset >= prev_offset.offset

if new_offset.offset > prev_offset.offset:
at_least_one_gt = True
if new_offset.offset > prev_offset.offset:
at_least_one_gt = True

assert at_least_one_gt

Expand All @@ -151,7 +149,7 @@ def _offset_checker(self, x):
return x

def _build_single(self, builder: mrc.Builder, input_stream):
node = builder.make_node_component(self.unique_name, ops.map(self._offset_checker))
node = builder.make_node(self.unique_name, ops.map(self._offset_checker))
builder.make_edge(input_stream[0], node)

return node, input_stream[1]
Expand Down