Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: Consumer group stale static member properties on rejoin #23693

Merged
merged 2 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 64 additions & 9 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,20 @@ ss::future<join_group_response> group::add_member(member_ptr member) {
}

void group::update_member_no_join(
member_ptr member, chunked_vector<member_protocol>&& new_protocols) {
member_ptr member,
chunked_vector<member_protocol>&& new_protocols,
const std::optional<kafka::client_id>& new_client_id,
const kafka::client_host& new_client_host,
std::chrono::milliseconds new_session_timeout,
std::chrono::milliseconds new_rebalance_timeout) {
vlog(
_ctxlog.trace,
"Updating {}joining member {} with protocols {}",
"Updating {}joining member {} with protocols {} and timeouts {}/{}",
member->is_joining() ? "" : "non-",
member,
new_protocols);
new_protocols,
new_session_timeout,
new_rebalance_timeout);

/*
* before updating the member, subtract its existing protocols from
Expand All @@ -386,11 +393,29 @@ void group::update_member_no_join(
for (auto& p : member->protocols()) {
_supported_protocols[p.name]++;
}

if (new_client_id) {
member->replace_client_id(*new_client_id);
}
member->replace_client_host(new_client_host);
member->replace_session_timeout(new_session_timeout);
member->replace_rebalance_timeout(new_rebalance_timeout);
}

ss::future<join_group_response> group::update_member(
member_ptr member, chunked_vector<member_protocol>&& new_protocols) {
update_member_no_join(member, std::move(new_protocols));
member_ptr member,
chunked_vector<member_protocol>&& new_protocols,
const std::optional<kafka::client_id>& new_client_id,
const kafka::client_host& new_client_host,
std::chrono::milliseconds new_session_timeout,
std::chrono::milliseconds new_rebalance_timeout) {
update_member_no_join(
member,
std::move(new_protocols),
new_client_id,
new_client_host,
new_session_timeout,
new_rebalance_timeout);

if (!member->is_joining()) {
_num_members_joining++;
Expand Down Expand Up @@ -696,7 +721,23 @@ group::join_group_stages group::update_static_member_and_rebalance(
* with new member id.</kafka>
*/
schedule_next_heartbeat_expiration(member);
auto f = update_member(member, native_member_protocols(r));

kafka::client_id old_client_id = member->client_id();
kafka::client_host old_client_host = member->client_host();
auto old_session_timeout
= std::chrono::duration_cast<std::chrono::milliseconds>(
member->session_timeout());
auto old_rebalance_timeout
= std::chrono::duration_cast<std::chrono::milliseconds>(
member->rebalance_timeout());

auto f = update_member(
member,
native_member_protocols(r),
r.client_id,
r.client_host,
r.data.session_timeout_ms,
r.data.rebalance_timeout_ms);
auto old_protocols = _members.at(new_member_id)->protocols().copy();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have an understanding of why old_protocols are a backup of the new protocols?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find any reason for it to make sense to store the "old" protocols after they have been updated. I believe it is a bug. However, as I couldn't test it, I preferred to not touch it as part of this change and see what we can do about it as a separate thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch (state()) {
case group_state::stable: {
Expand All @@ -715,7 +756,11 @@ group::join_group_stages group::update_static_member_and_rebalance(
instance_id = *r.data.group_instance_id,
new_member_id = std::move(new_member_id),
old_member_id = std::move(old_member_id),
old_protocols = std::move(old_protocols)](
old_protocols = std::move(old_protocols),
old_client_id = std::move(old_client_id),
old_client_host = std::move(old_client_host),
old_session_timeout = old_session_timeout,
old_rebalance_timeout = old_rebalance_timeout](
result<raft::replicate_result> result) mutable {
if (!result) {
vlog(
Expand All @@ -728,7 +773,12 @@ group::join_group_stages group::update_static_member_and_rebalance(
auto member = replace_static_member(
instance_id, new_member_id, old_member_id);
update_member_no_join(
member, std::move(old_protocols));
member,
std::move(old_protocols),
old_client_id,
old_client_host,
old_session_timeout,
old_rebalance_timeout);
schedule_next_heartbeat_expiration(member);
try_finish_joining_member(
member,
Expand Down Expand Up @@ -1036,7 +1086,12 @@ group::join_group_stages group::add_member_and_rebalance(
group::join_group_stages
group::update_member_and_rebalance(member_ptr member, join_group_request&& r) {
auto response = update_member(
std::move(member), native_member_protocols(r));
std::move(member),
native_member_protocols(r),
r.client_id,
r.client_host,
r.data.session_timeout_ms,
r.data.rebalance_timeout_ms);
try_prepare_rebalance();
return join_group_stages(std::move(response));
}
Expand Down
15 changes: 13 additions & 2 deletions src/v/kafka/server/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,24 @@ class group final : public ss::enable_lw_shared_from_this<group> {
* \returns join response promise set at the end of the join phase.
*/
ss::future<join_group_response> update_member(
member_ptr member, chunked_vector<member_protocol>&& new_protocols);
member_ptr member,
chunked_vector<member_protocol>&& new_protocols,
const std::optional<kafka::client_id>& new_client_id,
const kafka::client_host& new_client_host,
std::chrono::milliseconds new_session_timeout,
std::chrono::milliseconds new_rebalance_timeout);

/**
* Same as update_member but without returning the join promise. Used when
* reverting member state after failed group checkpoint
*/
void update_member_no_join(
member_ptr member, chunked_vector<member_protocol>&& new_protocols);
member_ptr member,
chunked_vector<member_protocol>&& new_protocols,
const std::optional<kafka::client_id>& new_client_id,
const kafka::client_host& new_client_host,
std::chrono::milliseconds new_session_timeout,
std::chrono::milliseconds new_rebalance_timeout);

/**
* \brief Get the timeout duration for rebalancing.
Expand Down
28 changes: 28 additions & 0 deletions src/v/kafka/server/member.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ class group_member {

void replace_id(member_id new_id) { _state.id = std::move(new_id); }

/// Get the member's client_id.
const kafka::client_id& client_id() const { return _state.client_id; }

/// Replace the member's client_id.
void replace_client_id(kafka::client_id new_client_id) {
_state.client_id = std::move(new_client_id);
}

/// Get the member's client_host.
const kafka::client_host& client_host() const { return _state.client_host; }

/// Replace the member's client_host.
void replace_client_host(kafka::client_host new_client_host) {
_state.client_host = std::move(new_client_host);
}

/// Get the id of the member's group.
const kafka::group_id& group_id() const { return _group_id; }

Expand All @@ -115,9 +131,21 @@ class group_member {
/// Get the member's session timeout.
duration_type session_timeout() const { return _state.session_timeout; }

/// Replace the member's session timeout.
void
replace_session_timeout(std::chrono::milliseconds new_session_timeout) {
_state.session_timeout = new_session_timeout;
}

/// Get the member's rebalance timeout.
duration_type rebalance_timeout() const { return _state.rebalance_timeout; }

/// Replace the member's rebalance timeout.
void
replace_rebalance_timeout(std::chrono::milliseconds new_rebalance_timeout) {
_state.rebalance_timeout = new_rebalance_timeout;
}

/// Get the member's protocol type.
const kafka::protocol_type& protocol_type() const { return _protocol_type; }

Expand Down
71 changes: 71 additions & 0 deletions src/v/kafka/server/tests/group_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "cluster/partition.h"
#include "config/configuration.h"
#include "container/fragmented_vector.h"
#include "kafka/protocol/types.h"
#include "kafka/server/group.h"
#include "kafka/server/group_metadata.h"
#include "utils/to_string.h"
Expand All @@ -21,6 +22,8 @@
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>

#include <chrono>

using namespace std::chrono_literals;
namespace kafka {

Expand Down Expand Up @@ -499,4 +502,72 @@ SEASTAR_THREAD_TEST_CASE(group_state_output) {
BOOST_TEST(s == "PreparingRebalance");
}

SEASTAR_THREAD_TEST_CASE(add_new_static_member) {
auto g = get();
const kafka::group_id common_group_id = g.id();
const kafka::group_instance_id common_instance_id{"0-0"};

const kafka::member_id m1_id{"m1"};
const kafka::client_id m1_client_id{"client-id-1"};
const kafka::client_host m1_client_host{"client-host-1"};
const std::chrono::milliseconds m1_session_timeout{30001};
const std::chrono::milliseconds m1_rebalance_timeout{45001};

// Create request for first member
join_group_request r1;
r1.client_id = m1_client_id;
r1.client_host = m1_client_host;
r1.data.group_id = common_group_id;
r1.data.group_instance_id = common_instance_id;
r1.data.session_timeout_ms = m1_session_timeout;
r1.data.rebalance_timeout_ms = m1_rebalance_timeout;

// adding first static member will call "add_member_and_rebalance"
g.add_new_static_member(m1_id, std::move(r1));

// validate group state
BOOST_TEST(g.contains_member(m1_id));

// validate new member
const auto m1 = g.get_member(m1_id);
BOOST_TEST(m1->group_id() == common_group_id);
BOOST_TEST(m1->id() == m1_id);
BOOST_TEST(m1->client_id() == m1_client_id);
BOOST_TEST(m1->client_host() == m1_client_host);
BOOST_TEST(m1->session_timeout() == m1_session_timeout);
BOOST_TEST(m1->rebalance_timeout() == m1_rebalance_timeout);

const kafka::member_id m2_id{"m2"};
const kafka::client_id m2_client_id{"client-id-2"};
const kafka::client_host m2_client_host{"client-host-2"};
const std::chrono::milliseconds m2_session_timeout{30002};
const std::chrono::milliseconds m2_rebalance_timeout{45002};

// Create request for second member to update m1
join_group_request r2;
r2.client_id = m2_client_id;
r2.client_host = m2_client_host;
r2.data.group_id = common_group_id;
r2.data.group_instance_id = common_instance_id;
r2.data.session_timeout_ms = m2_session_timeout;
r2.data.rebalance_timeout_ms = m2_rebalance_timeout;

// adding second static member will call
// "update_static_member_and_rebalance"
g.add_new_static_member(m2_id, std::move(r2));

// validate group state
BOOST_TEST(!g.contains_member(m1_id));
BOOST_TEST(g.contains_member(m2_id));

// validate updated member
const auto m2 = g.get_member(m2_id);
BOOST_TEST(m2->group_id() == common_group_id);
BOOST_TEST(m2->id() == m2_id);
BOOST_TEST(m2->client_id() == m2_client_id);
BOOST_TEST(m2->client_host() == m2_client_host);
BOOST_TEST(m2->session_timeout() == m2_session_timeout);
BOOST_TEST(m2->rebalance_timeout() == m2_rebalance_timeout);
}

} // namespace kafka
83 changes: 83 additions & 0 deletions tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,89 @@ async def create_groups(r):

assert len(list) == groups_in_round * rounds

@cluster(num_nodes=5)
def test_consumer_static_member_update(self):
"""
Test validating that re-joining static member will update the client id
"""
self.create_topic(20)

group = 'test-gr-1'

rpk = RpkTool(self.redpanda)

# create and start first consumer
consumer1 = self.create_consumer(
topic=self.topic_spec.name,
group=group,
instance_name="static-consumer",
instance_id="panda-instance",
consumer_properties={"client.id": "my-client-1"})

consumer1.start()

self.wait_for_members(group, 1)

# wait for some messages
self.start_producer()
wait_until(
lambda: ConsumerGroupTest.consumed_at_least([consumer1], 50),
timeout_sec=30,
backoff_sec=2,
err_msg="consumer1 did not consume messages")

# validate initial state
rpk_group_1 = rpk.group_describe(group)

assert rpk_group_1.state == "Stable", f"Describe: {rpk_group_1}"
assert rpk_group_1.members == 1, f"Describe: {rpk_group_1}"
for p in rpk_group_1.partitions:
assert p.client_id == 'my-client-1', f"Describe: {p}"

# clean up
self.producer.wait()
self.producer.free()

consumer1.stop()
consumer1.wait()
consumer1.free()

# create and start consumer with same instance_id but different cliend_id
consumer2 = self.create_consumer(
topic=self.topic_spec.name,
group=group,
instance_name="static-consumer",
instance_id="panda-instance",
consumer_properties={"client.id": "my-client-2"})

consumer2.start()

self.wait_for_members(group, 1)

# wait for some messages
self.start_producer()
wait_until(
lambda: ConsumerGroupTest.consumed_at_least([consumer2], 50),
timeout_sec=30,
backoff_sec=2,
err_msg="consumer2 did not consume messages")

# validate updated state
rpk_group_2 = rpk.group_describe(group)

assert rpk_group_2.state == "Stable", f"Describe: {rpk_group_2}"
assert rpk_group_2.members == 1, f"Describe: {rpk_group_2}"
for p in rpk_group_2.partitions:
assert p.client_id == 'my-client-2', f"Describe: {p}"

# clean up
consumer2.stop()
consumer2.wait()
consumer2.free()

self.producer.wait()
self.producer.free()


@dataclass
class OffsetAndMetadata():
Expand Down