Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-5766 Validate target node id when collecting health report #22811

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/v/cluster/errc.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ enum class errc : int16_t {
data_migration_already_exists,
data_migration_invalid_resources,
resource_is_being_migrated,
invalid_target_node_id,
};

std::ostream& operator<<(std::ostream& o, errc err);
Expand Down Expand Up @@ -282,6 +283,8 @@ struct errc_category final : public std::error_category {
case errc::resource_is_being_migrated:
return "Requested operation can not be executed as the resource is "
"undergoing data migration";
case errc::invalid_target_node_id:
return "Request was intended for the node with different node id";
}
return "cluster::errc::unknown";
}
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/errors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ std::ostream& operator<<(std::ostream& o, cluster::errc err) {
return o << "cluster::errc::data_migration_invalid_resources";
case errc::resource_is_being_migrated:
return o << "cluster::errc::resource_is_being_migrated";
case errc::invalid_target_node_id:
return o << "cluster::errc::invalid_target_node_id";
}
}
} // namespace cluster
13 changes: 8 additions & 5 deletions src/v/cluster/health_monitor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,30 +360,33 @@ health_monitor_backend::collect_remote_node_health(model::node_id id) {
ss::this_shard_id(),
id,
max_metadata_age(),
[timeout](controller_client_protocol client) mutable {
[timeout, id](controller_client_protocol client) mutable {
return client.collect_node_health_report(
get_node_health_request{}, rpc::client_opts(timeout));
get_node_health_request(id), rpc::client_opts(timeout));
})
.then(&rpc::get_ctx_data<get_node_health_reply>)
.then([this, id](result<get_node_health_reply> reply) {
return process_node_reply(id, std::move(reply));
});
}

result<node_health_report>
map_reply_result(result<get_node_health_reply> reply) {
result<node_health_report> map_reply_result(
model::node_id target_node_id, result<get_node_health_reply> reply) {
if (!reply) {
return {reply.error()};
}
if (!reply.value().report.has_value()) {
return {reply.value().error};
}
if (reply.value().report->id != target_node_id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of rechecking it here, if we anyway have checked it on the server side in service::do_collect_node_health_report? I guess it'll only make sense for some corner cases when nodes run different versions of Redpanda?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly

return {errc::invalid_target_node_id};
}
return {std::move(*reply.value().report).to_in_memory()};
}

result<node_health_report> health_monitor_backend::process_node_reply(
model::node_id id, result<get_node_health_reply> reply) {
auto res = map_reply_result(std::move(reply));
auto res = map_reply_result(id, std::move(reply));
auto [status_it, _] = _status.try_emplace(id);
if (!res) {
vlog(
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/health_monitor_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ std::ostream& operator<<(std::ostream& o, const partitions_filter& filter) {
return o;
}

std::ostream& operator<<(std::ostream& o, const get_node_health_request&) {
fmt::print(o, "{{}}");
std::ostream& operator<<(std::ostream& o, const get_node_health_request& r) {
fmt::print(o, "{{target_node_id: {}}}", r.get_target_node_id());
return o;
}

Expand Down
12 changes: 10 additions & 2 deletions src/v/cluster/health_monitor_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,10 +448,13 @@ using force_refresh = ss::bool_class<struct hm_force_refresh_tag>;
class get_node_health_request
: public serde::envelope<
get_node_health_request,
serde::version<0>,
serde::version<1>,
serde::compat_version<0>> {
public:
using rpc_adl_exempt = std::true_type;
get_node_health_request() = default;
explicit get_node_health_request(model::node_id target_node_id)
: _target_node_id(target_node_id) {}

friend bool
operator==(const get_node_health_request&, const get_node_health_request&)
Expand All @@ -460,9 +463,14 @@ class get_node_health_request
friend std::ostream&
operator<<(std::ostream&, const get_node_health_request&);

auto serde_fields() { return std::tie(_filter); }
auto serde_fields() { return std::tie(_filter, _target_node_id); }
static constexpr model::node_id node_id_not_set{-1};

model::node_id get_target_node_id() const { return _target_node_id; }

private:
// default value for backward compatibility
model::node_id _target_node_id = node_id_not_set;
/**
* This field is no longer used, as it never was. It was made private on
* purpose
Expand Down
35 changes: 22 additions & 13 deletions src/v/cluster/node_status_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ ss::future<result<node_status>> node_status_backend::send_node_status_request(
})
.then(&rpc::get_ctx_data<node_status_reply>);

co_return process_reply(reply);
co_return process_reply(target, reply);
}

ss::future<> node_status_backend::maybe_create_client(
Expand All @@ -245,26 +245,18 @@ ss::future<> node_status_backend::maybe_create_client(
target, address, _rpc_tls_config, create_backoff_policy());
}

result<node_status>
node_status_backend::process_reply(result<node_status_reply> reply) {
result<node_status> node_status_backend::process_reply(
model::node_id target_node_id, result<node_status_reply> reply) {
vassert(ss::this_shard_id() == shard, "invoked on a wrong shard");

if (!reply.has_error()) {
_stats.rpcs_sent += 1;
auto& replier_metadata = reply.value().replier_metadata;

return node_status{
.node_id = replier_metadata.node_id,
.last_seen = rpc::clock_type::now()};
} else {
static constexpr auto rate_limit = std::chrono::seconds(1);
if (reply.has_error()) {
auto err = reply.error();
if (
err.category() == rpc::error_category()
&& static_cast<rpc::errc>(err.value())
== rpc::errc::client_request_timeout) {
_stats.rpcs_timed_out += 1;
}
static constexpr auto rate_limit = std::chrono::seconds(1);
static ss::logger::rate_limit rate(rate_limit);
clusterlog.log(
ss::log_level::debug,
Expand All @@ -273,6 +265,23 @@ node_status_backend::process_reply(result<node_status_reply> reply) {
err.message());
return err;
}

_stats.rpcs_sent += 1;
auto& replier_metadata = reply.value().replier_metadata;
if (replier_metadata.node_id != target_node_id) {
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
static ss::logger::rate_limit rate(rate_limit);
clusterlog.log(
ss::log_level::debug,
rate,
"Received reply from node with different node id. Expected: {}, "
"current: {}",
target_node_id,
replier_metadata.node_id);
return errc::invalid_target_node_id;
}

return node_status{
.node_id = replier_metadata.node_id, .last_seen = rpc::clock_type::now()};
}

ss::future<node_status_reply>
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/node_status_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class node_status_backend {
ss::future<> collect_and_store_updates();
ss::future<std::vector<node_status>> collect_updates_from_peers();

result<node_status> process_reply(result<node_status_reply>);
result<node_status> process_reply(
model::node_id target_node_id, result<node_status_reply> reply);
ss::future<node_status_reply> process_request(node_status_request);

ss::future<result<node_status>>
Expand Down
16 changes: 15 additions & 1 deletion src/v/cluster/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,21 @@ ss::future<get_cluster_health_reply> service::get_cluster_health_report(
}

ss::future<get_node_health_reply>
service::do_collect_node_health_report(get_node_health_request) {
service::do_collect_node_health_report(get_node_health_request req) {
// validate if the receiving node is the one that that the request is
// addressed to
if (
req.get_target_node_id() != get_node_health_request::node_id_not_set
&& req.get_target_node_id() != _controller->self()) {
vlog(
clusterlog.debug,
"Received a get_node_health request addressed to different node. "
"Requested node id: {}, current node id: {}",
req.get_target_node_id(),
_controller->self());
co_return get_node_health_reply{.error = errc::invalid_target_node_id};
}

auto res = co_await _hm_frontend.local().get_current_node_health();
if (res.has_error()) {
co_return get_node_health_reply{
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ constexpr error_code map_topic_error_code(cluster::errc code) {
case cluster::errc::data_migration_already_exists:
case cluster::errc::data_migration_not_exists:
case cluster::errc::data_migration_invalid_resources:
case cluster::errc::invalid_target_node_id:
break;
}
return error_code::unknown_server_error;
Expand Down
15 changes: 15 additions & 0 deletions tests/rptest/tests/node_folder_deletion_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,22 @@ def test_deleting_node_folder(self):
wait_until(lambda: producer.produce_status.acked > 200000,
timeout_sec=120,
backoff_sec=0.5)

admin = Admin(self.redpanda)

# validate that the node with deleted folder is recognized as offline
def removed_node_is_reported_offline():
cluster_health = admin.get_cluster_health_overview()
return id in cluster_health['nodes_down']

wait_until(
removed_node_is_reported_offline,
timeout_sec=20,
backoff_sec=0.5,
err_msg=
f"Node {id} is expected to be marked as offline as it was replaced by new node"
)

# decommission a node that has been cleared
admin.decommission_broker(id)
waiter = NodeDecommissionWaiter(self.redpanda,
Expand Down