Skip to content

Commit

Permalink
kafka: Add ducktape test for updating a group static member
Browse files Browse the repository at this point in the history
  • Loading branch information
IoannisRP committed Oct 9, 2024
1 parent e1bc0df commit 72365bb
Showing 1 changed file with 83 additions and 0 deletions.
83 changes: 83 additions & 0 deletions tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,89 @@ async def create_groups(r):

assert len(list) == groups_in_round * rounds

@cluster(num_nodes=5)
def test_consumer_static_member_update(self):
"""
Test validating that re-joining static member will update the client id
"""
self.create_topic(20)

group = 'test-gr-1'

rpk = RpkTool(self.redpanda)

# create and start first consumer
consumer1 = self.create_consumer(
topic=self.topic_spec.name,
group=group,
instance_name="static-consumer",
instance_id="panda-instance",
consumer_properties={"client.id": "my-client-1"})

consumer1.start()

self.wait_for_members(group, 1)

# wait for some messages
self.start_producer()
wait_until(
lambda: ConsumerGroupTest.consumed_at_least([consumer1], 50),
timeout_sec=30,
backoff_sec=2,
err_msg="consumer1 did not consume messages")

# validate initial state
rpk_group_1 = rpk.group_describe(group)

assert rpk_group_1.state == "Stable", f"Describe: {rpk_group_1}"
assert rpk_group_1.members == 1, f"Describe: {rpk_group_1}"
for p in rpk_group_1.partitions:
assert p.client_id == 'my-client-1', f"Describe: {p}"

# clean up
self.producer.wait()
self.producer.free()

consumer1.stop()
consumer1.wait()
consumer1.free()

# create and start consumer with same instance_id but different cliend_id
consumer2 = self.create_consumer(
topic=self.topic_spec.name,
group=group,
instance_name="static-consumer",
instance_id="panda-instance",
consumer_properties={"client.id": "my-client-2"})

consumer2.start()

self.wait_for_members(group, 1)

# wait for some messages
self.start_producer()
wait_until(
lambda: ConsumerGroupTest.consumed_at_least([consumer2], 50),
timeout_sec=30,
backoff_sec=2,
err_msg="consumer2 did not consume messages")

# validate updated state
rpk_group_2 = rpk.group_describe(group)

assert rpk_group_2.state == "Stable", f"Describe: {rpk_group_2}"
assert rpk_group_2.members == 1, f"Describe: {rpk_group_2}"
for p in rpk_group_2.partitions:
assert p.client_id == 'my-client-2', f"Describe: {p}"

# clean up
consumer2.stop()
consumer2.wait()
consumer2.free()

self.producer.wait()
self.producer.free()


@dataclass
class OffsetAndMetadata():
Expand Down

0 comments on commit 72365bb

Please sign in to comment.