Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Kafka offset checking test #1212

Merged
Prev Previous commit
Next Next commit
Declare OffsetChecker as a MRC component so that it executes prior to…
… the source's yield returns.

Adjust the offset checker to pass if there is a new partition introduced.
  • Loading branch information
dagardner-nv committed Sep 21, 2023
commit 948f4ef9c2a9d1e5fe64c39ce2e0eae371884639
14 changes: 8 additions & 6 deletions tests/test_kafka_source_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,15 @@ def _offset_checker(self, x):
new_offsets = self._client.list_consumer_group_offsets(self._group_id)

if self._offsets is not None:
for (topic_partition, prev_offset) in self._offsets.items():
new_offset = new_offsets[topic_partition]
at_least_one_gt = len(new_offsets) > len(self._offsets)
if not at_least_one_gt:
for (topic_partition, prev_offset) in self._offsets.items():
new_offset = new_offsets[topic_partition]

assert new_offset.offset >= prev_offset.offset
assert new_offset.offset >= prev_offset.offset

if new_offset.offset > prev_offset.offset:
at_least_one_gt = True
if new_offset.offset > prev_offset.offset:
at_least_one_gt = True

assert at_least_one_gt

Expand All @@ -149,7 +151,7 @@ def _offset_checker(self, x):
return x

def _build_single(self, builder: mrc.Builder, input_stream):
node = builder.make_node(self.unique_name, ops.map(self._offset_checker))
node = builder.make_node_component(self.unique_name, ops.map(self._offset_checker))
builder.make_edge(input_stream[0], node)

return node, input_stream[1]
Expand Down