Skip to content

Commit

Permalink
Merge pull request redpanda-data#23693 from IoannisRP/stale_static_me…
Browse files Browse the repository at this point in the history
…mber

kafka: Consumer group stale static member properties on rejoin
  • Loading branch information
michael-redpanda authored Oct 10, 2024
2 parents 078ba49 + 72365bb commit c3ed8fe
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 11 deletions.
73 changes: 64 additions & 9 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,20 @@ ss::future<join_group_response> group::add_member(member_ptr member) {
}

void group::update_member_no_join(
member_ptr member, chunked_vector<member_protocol>&& new_protocols) {
member_ptr member,
chunked_vector<member_protocol>&& new_protocols,
const std::optional<kafka::client_id>& new_client_id,
const kafka::client_host& new_client_host,
std::chrono::milliseconds new_session_timeout,
std::chrono::milliseconds new_rebalance_timeout) {
vlog(
_ctxlog.trace,
"Updating {}joining member {} with protocols {}",
"Updating {}joining member {} with protocols {} and timeouts {}/{}",
member->is_joining() ? "" : "non-",
member,
new_protocols);
new_protocols,
new_session_timeout,
new_rebalance_timeout);

/*
* before updating the member, subtract its existing protocols from
Expand All @@ -386,11 +393,29 @@ void group::update_member_no_join(
for (auto& p : member->protocols()) {
_supported_protocols[p.name]++;
}

if (new_client_id) {
member->replace_client_id(*new_client_id);
}
member->replace_client_host(new_client_host);
member->replace_session_timeout(new_session_timeout);
member->replace_rebalance_timeout(new_rebalance_timeout);
}

ss::future<join_group_response> group::update_member(
member_ptr member, chunked_vector<member_protocol>&& new_protocols) {
update_member_no_join(member, std::move(new_protocols));
member_ptr member,
chunked_vector<member_protocol>&& new_protocols,
const std::optional<kafka::client_id>& new_client_id,
const kafka::client_host& new_client_host,
std::chrono::milliseconds new_session_timeout,
std::chrono::milliseconds new_rebalance_timeout) {
update_member_no_join(
member,
std::move(new_protocols),
new_client_id,
new_client_host,
new_session_timeout,
new_rebalance_timeout);

if (!member->is_joining()) {
_num_members_joining++;
Expand Down Expand Up @@ -696,7 +721,23 @@ group::join_group_stages group::update_static_member_and_rebalance(
* with new member id.</kafka>
*/
schedule_next_heartbeat_expiration(member);
auto f = update_member(member, native_member_protocols(r));

kafka::client_id old_client_id = member->client_id();
kafka::client_host old_client_host = member->client_host();
auto old_session_timeout
= std::chrono::duration_cast<std::chrono::milliseconds>(
member->session_timeout());
auto old_rebalance_timeout
= std::chrono::duration_cast<std::chrono::milliseconds>(
member->rebalance_timeout());

auto f = update_member(
member,
native_member_protocols(r),
r.client_id,
r.client_host,
r.data.session_timeout_ms,
r.data.rebalance_timeout_ms);
auto old_protocols = _members.at(new_member_id)->protocols().copy();
switch (state()) {
case group_state::stable: {
Expand All @@ -715,7 +756,11 @@ group::join_group_stages group::update_static_member_and_rebalance(
instance_id = *r.data.group_instance_id,
new_member_id = std::move(new_member_id),
old_member_id = std::move(old_member_id),
old_protocols = std::move(old_protocols)](
old_protocols = std::move(old_protocols),
old_client_id = std::move(old_client_id),
old_client_host = std::move(old_client_host),
old_session_timeout = old_session_timeout,
old_rebalance_timeout = old_rebalance_timeout](
result<raft::replicate_result> result) mutable {
if (!result) {
vlog(
Expand All @@ -728,7 +773,12 @@ group::join_group_stages group::update_static_member_and_rebalance(
auto member = replace_static_member(
instance_id, new_member_id, old_member_id);
update_member_no_join(
member, std::move(old_protocols));
member,
std::move(old_protocols),
old_client_id,
old_client_host,
old_session_timeout,
old_rebalance_timeout);
schedule_next_heartbeat_expiration(member);
try_finish_joining_member(
member,
Expand Down Expand Up @@ -1036,7 +1086,12 @@ group::join_group_stages group::add_member_and_rebalance(
group::join_group_stages
group::update_member_and_rebalance(member_ptr member, join_group_request&& r) {
auto response = update_member(
std::move(member), native_member_protocols(r));
std::move(member),
native_member_protocols(r),
r.client_id,
r.client_host,
r.data.session_timeout_ms,
r.data.rebalance_timeout_ms);
try_prepare_rebalance();
return join_group_stages(std::move(response));
}
Expand Down
15 changes: 13 additions & 2 deletions src/v/kafka/server/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,24 @@ class group final : public ss::enable_lw_shared_from_this<group> {
* \returns join response promise set at the end of the join phase.
*/
ss::future<join_group_response> update_member(
member_ptr member, chunked_vector<member_protocol>&& new_protocols);
member_ptr member,
chunked_vector<member_protocol>&& new_protocols,
const std::optional<kafka::client_id>& new_client_id,
const kafka::client_host& new_client_host,
std::chrono::milliseconds new_session_timeout,
std::chrono::milliseconds new_rebalance_timeout);

/**
* Same as update_member but without returning the join promise. Used when
* reverting member state after failed group checkpoint
*/
void update_member_no_join(
member_ptr member, chunked_vector<member_protocol>&& new_protocols);
member_ptr member,
chunked_vector<member_protocol>&& new_protocols,
const std::optional<kafka::client_id>& new_client_id,
const kafka::client_host& new_client_host,
std::chrono::milliseconds new_session_timeout,
std::chrono::milliseconds new_rebalance_timeout);

/**
* \brief Get the timeout duration for rebalancing.
Expand Down
28 changes: 28 additions & 0 deletions src/v/kafka/server/member.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ class group_member {

void replace_id(member_id new_id) { _state.id = std::move(new_id); }

/// Get the member's client_id.
const kafka::client_id& client_id() const { return _state.client_id; }

/// Replace the member's client_id.
void replace_client_id(kafka::client_id new_client_id) {
_state.client_id = std::move(new_client_id);
}

/// Get the member's client_host.
const kafka::client_host& client_host() const { return _state.client_host; }

/// Replace the member's client_host.
void replace_client_host(kafka::client_host new_client_host) {
_state.client_host = std::move(new_client_host);
}

/// Get the id of the member's group.
const kafka::group_id& group_id() const { return _group_id; }

Expand All @@ -115,9 +131,21 @@ class group_member {
/// Get the member's session timeout.
duration_type session_timeout() const { return _state.session_timeout; }

/// Replace the member's session timeout.
void
replace_session_timeout(std::chrono::milliseconds new_session_timeout) {
_state.session_timeout = new_session_timeout;
}

/// Get the member's rebalance timeout.
duration_type rebalance_timeout() const { return _state.rebalance_timeout; }

/// Replace the member's rebalance timeout.
void
replace_rebalance_timeout(std::chrono::milliseconds new_rebalance_timeout) {
_state.rebalance_timeout = new_rebalance_timeout;
}

/// Get the member's protocol type.
const kafka::protocol_type& protocol_type() const { return _protocol_type; }

Expand Down
71 changes: 71 additions & 0 deletions src/v/kafka/server/tests/group_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "cluster/partition.h"
#include "config/configuration.h"
#include "container/fragmented_vector.h"
#include "kafka/protocol/types.h"
#include "kafka/server/group.h"
#include "kafka/server/group_metadata.h"
#include "utils/to_string.h"
Expand All @@ -21,6 +22,8 @@
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>

#include <chrono>

using namespace std::chrono_literals;
namespace kafka {

Expand Down Expand Up @@ -499,4 +502,72 @@ SEASTAR_THREAD_TEST_CASE(group_state_output) {
BOOST_TEST(s == "PreparingRebalance");
}

SEASTAR_THREAD_TEST_CASE(add_new_static_member) {
auto g = get();
const kafka::group_id common_group_id = g.id();
const kafka::group_instance_id common_instance_id{"0-0"};

const kafka::member_id m1_id{"m1"};
const kafka::client_id m1_client_id{"client-id-1"};
const kafka::client_host m1_client_host{"client-host-1"};
const std::chrono::milliseconds m1_session_timeout{30001};
const std::chrono::milliseconds m1_rebalance_timeout{45001};

// Create request for first member
join_group_request r1;
r1.client_id = m1_client_id;
r1.client_host = m1_client_host;
r1.data.group_id = common_group_id;
r1.data.group_instance_id = common_instance_id;
r1.data.session_timeout_ms = m1_session_timeout;
r1.data.rebalance_timeout_ms = m1_rebalance_timeout;

// adding first static member will call "add_member_and_rebalance"
g.add_new_static_member(m1_id, std::move(r1));

// validate group state
BOOST_TEST(g.contains_member(m1_id));

// validate new member
const auto m1 = g.get_member(m1_id);
BOOST_TEST(m1->group_id() == common_group_id);
BOOST_TEST(m1->id() == m1_id);
BOOST_TEST(m1->client_id() == m1_client_id);
BOOST_TEST(m1->client_host() == m1_client_host);
BOOST_TEST(m1->session_timeout() == m1_session_timeout);
BOOST_TEST(m1->rebalance_timeout() == m1_rebalance_timeout);

const kafka::member_id m2_id{"m2"};
const kafka::client_id m2_client_id{"client-id-2"};
const kafka::client_host m2_client_host{"client-host-2"};
const std::chrono::milliseconds m2_session_timeout{30002};
const std::chrono::milliseconds m2_rebalance_timeout{45002};

// Create request for second member to update m1
join_group_request r2;
r2.client_id = m2_client_id;
r2.client_host = m2_client_host;
r2.data.group_id = common_group_id;
r2.data.group_instance_id = common_instance_id;
r2.data.session_timeout_ms = m2_session_timeout;
r2.data.rebalance_timeout_ms = m2_rebalance_timeout;

// adding second static member will call
// "update_static_member_and_rebalance"
g.add_new_static_member(m2_id, std::move(r2));

// validate group state
BOOST_TEST(!g.contains_member(m1_id));
BOOST_TEST(g.contains_member(m2_id));

// validate updated member
const auto m2 = g.get_member(m2_id);
BOOST_TEST(m2->group_id() == common_group_id);
BOOST_TEST(m2->id() == m2_id);
BOOST_TEST(m2->client_id() == m2_client_id);
BOOST_TEST(m2->client_host() == m2_client_host);
BOOST_TEST(m2->session_timeout() == m2_session_timeout);
BOOST_TEST(m2->rebalance_timeout() == m2_rebalance_timeout);
}

} // namespace kafka
83 changes: 83 additions & 0 deletions tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,89 @@ async def create_groups(r):

assert len(list) == groups_in_round * rounds

@cluster(num_nodes=5)
def test_consumer_static_member_update(self):
"""
Test validating that re-joining static member will update the client id
"""
self.create_topic(20)

group = 'test-gr-1'

rpk = RpkTool(self.redpanda)

# create and start first consumer
consumer1 = self.create_consumer(
topic=self.topic_spec.name,
group=group,
instance_name="static-consumer",
instance_id="panda-instance",
consumer_properties={"client.id": "my-client-1"})

consumer1.start()

self.wait_for_members(group, 1)

# wait for some messages
self.start_producer()
wait_until(
lambda: ConsumerGroupTest.consumed_at_least([consumer1], 50),
timeout_sec=30,
backoff_sec=2,
err_msg="consumer1 did not consume messages")

# validate initial state
rpk_group_1 = rpk.group_describe(group)

assert rpk_group_1.state == "Stable", f"Describe: {rpk_group_1}"
assert rpk_group_1.members == 1, f"Describe: {rpk_group_1}"
for p in rpk_group_1.partitions:
assert p.client_id == 'my-client-1', f"Describe: {p}"

# clean up
self.producer.wait()
self.producer.free()

consumer1.stop()
consumer1.wait()
consumer1.free()

# create and start consumer with same instance_id but different cliend_id
consumer2 = self.create_consumer(
topic=self.topic_spec.name,
group=group,
instance_name="static-consumer",
instance_id="panda-instance",
consumer_properties={"client.id": "my-client-2"})

consumer2.start()

self.wait_for_members(group, 1)

# wait for some messages
self.start_producer()
wait_until(
lambda: ConsumerGroupTest.consumed_at_least([consumer2], 50),
timeout_sec=30,
backoff_sec=2,
err_msg="consumer2 did not consume messages")

# validate updated state
rpk_group_2 = rpk.group_describe(group)

assert rpk_group_2.state == "Stable", f"Describe: {rpk_group_2}"
assert rpk_group_2.members == 1, f"Describe: {rpk_group_2}"
for p in rpk_group_2.partitions:
assert p.client_id == 'my-client-2', f"Describe: {p}"

# clean up
consumer2.stop()
consumer2.wait()
consumer2.free()

self.producer.wait()
self.producer.free()


@dataclass
class OffsetAndMetadata():
Expand Down

0 comments on commit c3ed8fe

Please sign in to comment.