Skip to content

Commit

Permalink
Merge pull request #22811 from mmaslankaprv/CORE-5766-health-report-n…
Browse files Browse the repository at this point in the history
…ode-id

CORE-5766 Validate target node id when collecting health report
  • Loading branch information
piyushredpanda authored Aug 10, 2024
2 parents c04195d + 08de93d commit 59e4e6e
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 24 deletions.
3 changes: 3 additions & 0 deletions src/v/cluster/errc.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ enum class errc : int16_t {
data_migration_already_exists,
data_migration_invalid_resources,
resource_is_being_migrated,
invalid_target_node_id,
};

std::ostream& operator<<(std::ostream& o, errc err);
Expand Down Expand Up @@ -282,6 +283,8 @@ struct errc_category final : public std::error_category {
case errc::resource_is_being_migrated:
return "Requested operation can not be executed as the resource is "
"undergoing data migration";
case errc::invalid_target_node_id:
return "Request was intended for the node with different node id";
}
return "cluster::errc::unknown";
}
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/errors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ std::ostream& operator<<(std::ostream& o, cluster::errc err) {
return o << "cluster::errc::data_migration_invalid_resources";
case errc::resource_is_being_migrated:
return o << "cluster::errc::resource_is_being_migrated";
case errc::invalid_target_node_id:
return o << "cluster::errc::invalid_target_node_id";
}
}
} // namespace cluster
13 changes: 8 additions & 5 deletions src/v/cluster/health_monitor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,30 +360,33 @@ health_monitor_backend::collect_remote_node_health(model::node_id id) {
ss::this_shard_id(),
id,
max_metadata_age(),
[timeout](controller_client_protocol client) mutable {
[timeout, id](controller_client_protocol client) mutable {
return client.collect_node_health_report(
get_node_health_request{}, rpc::client_opts(timeout));
get_node_health_request(id), rpc::client_opts(timeout));
})
.then(&rpc::get_ctx_data<get_node_health_reply>)
.then([this, id](result<get_node_health_reply> reply) {
return process_node_reply(id, std::move(reply));
});
}

result<node_health_report>
map_reply_result(result<get_node_health_reply> reply) {
result<node_health_report> map_reply_result(
model::node_id target_node_id, result<get_node_health_reply> reply) {
if (!reply) {
return {reply.error()};
}
if (!reply.value().report.has_value()) {
return {reply.value().error};
}
if (reply.value().report->id != target_node_id) {
return {errc::invalid_target_node_id};
}
return {std::move(*reply.value().report).to_in_memory()};
}

result<node_health_report> health_monitor_backend::process_node_reply(
model::node_id id, result<get_node_health_reply> reply) {
auto res = map_reply_result(std::move(reply));
auto res = map_reply_result(id, std::move(reply));
auto [status_it, _] = _status.try_emplace(id);
if (!res) {
vlog(
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/health_monitor_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ std::ostream& operator<<(std::ostream& o, const partitions_filter& filter) {
return o;
}

std::ostream& operator<<(std::ostream& o, const get_node_health_request&) {
fmt::print(o, "{{}}");
std::ostream& operator<<(std::ostream& o, const get_node_health_request& r) {
fmt::print(o, "{{target_node_id: {}}}", r.get_target_node_id());
return o;
}

Expand Down
12 changes: 10 additions & 2 deletions src/v/cluster/health_monitor_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,10 +448,13 @@ using force_refresh = ss::bool_class<struct hm_force_refresh_tag>;
class get_node_health_request
: public serde::envelope<
get_node_health_request,
serde::version<0>,
serde::version<1>,
serde::compat_version<0>> {
public:
using rpc_adl_exempt = std::true_type;
get_node_health_request() = default;
explicit get_node_health_request(model::node_id target_node_id)
: _target_node_id(target_node_id) {}

friend bool
operator==(const get_node_health_request&, const get_node_health_request&)
Expand All @@ -460,9 +463,14 @@ class get_node_health_request
friend std::ostream&
operator<<(std::ostream&, const get_node_health_request&);

auto serde_fields() { return std::tie(_filter); }
auto serde_fields() { return std::tie(_filter, _target_node_id); }
static constexpr model::node_id node_id_not_set{-1};

model::node_id get_target_node_id() const { return _target_node_id; }

private:
// default value for backward compatibility
model::node_id _target_node_id = node_id_not_set;
/**
* This field is no longer used, as it never was. It was made private on
* purpose
Expand Down
35 changes: 22 additions & 13 deletions src/v/cluster/node_status_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ ss::future<result<node_status>> node_status_backend::send_node_status_request(
})
.then(&rpc::get_ctx_data<node_status_reply>);

co_return process_reply(reply);
co_return process_reply(target, reply);
}

ss::future<> node_status_backend::maybe_create_client(
Expand All @@ -245,26 +245,18 @@ ss::future<> node_status_backend::maybe_create_client(
target, address, _rpc_tls_config, create_backoff_policy());
}

result<node_status>
node_status_backend::process_reply(result<node_status_reply> reply) {
result<node_status> node_status_backend::process_reply(
model::node_id target_node_id, result<node_status_reply> reply) {
vassert(ss::this_shard_id() == shard, "invoked on a wrong shard");

if (!reply.has_error()) {
_stats.rpcs_sent += 1;
auto& replier_metadata = reply.value().replier_metadata;

return node_status{
.node_id = replier_metadata.node_id,
.last_seen = rpc::clock_type::now()};
} else {
static constexpr auto rate_limit = std::chrono::seconds(1);
if (reply.has_error()) {
auto err = reply.error();
if (
err.category() == rpc::error_category()
&& static_cast<rpc::errc>(err.value())
== rpc::errc::client_request_timeout) {
_stats.rpcs_timed_out += 1;
}
static constexpr auto rate_limit = std::chrono::seconds(1);
static ss::logger::rate_limit rate(rate_limit);
clusterlog.log(
ss::log_level::debug,
Expand All @@ -273,6 +265,23 @@ node_status_backend::process_reply(result<node_status_reply> reply) {
err.message());
return err;
}

_stats.rpcs_sent += 1;
auto& replier_metadata = reply.value().replier_metadata;
if (replier_metadata.node_id != target_node_id) {
static ss::logger::rate_limit rate(rate_limit);
clusterlog.log(
ss::log_level::debug,
rate,
"Received reply from node with different node id. Expected: {}, "
"current: {}",
target_node_id,
replier_metadata.node_id);
return errc::invalid_target_node_id;
}

return node_status{
.node_id = replier_metadata.node_id, .last_seen = rpc::clock_type::now()};
}

ss::future<node_status_reply>
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/node_status_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class node_status_backend {
ss::future<> collect_and_store_updates();
ss::future<std::vector<node_status>> collect_updates_from_peers();

result<node_status> process_reply(result<node_status_reply>);
result<node_status> process_reply(
model::node_id target_node_id, result<node_status_reply> reply);
ss::future<node_status_reply> process_request(node_status_request);

ss::future<result<node_status>>
Expand Down
16 changes: 15 additions & 1 deletion src/v/cluster/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,21 @@ ss::future<get_cluster_health_reply> service::get_cluster_health_report(
}

ss::future<get_node_health_reply>
service::do_collect_node_health_report(get_node_health_request) {
service::do_collect_node_health_report(get_node_health_request req) {
// validate if the receiving node is the one that that the request is
// addressed to
if (
req.get_target_node_id() != get_node_health_request::node_id_not_set
&& req.get_target_node_id() != _controller->self()) {
vlog(
clusterlog.debug,
"Received a get_node_health request addressed to different node. "
"Requested node id: {}, current node id: {}",
req.get_target_node_id(),
_controller->self());
co_return get_node_health_reply{.error = errc::invalid_target_node_id};
}

auto res = co_await _hm_frontend.local().get_current_node_health();
if (res.has_error()) {
co_return get_node_health_reply{
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ constexpr error_code map_topic_error_code(cluster::errc code) {
case cluster::errc::data_migration_already_exists:
case cluster::errc::data_migration_not_exists:
case cluster::errc::data_migration_invalid_resources:
case cluster::errc::invalid_target_node_id:
break;
}
return error_code::unknown_server_error;
Expand Down
15 changes: 15 additions & 0 deletions tests/rptest/tests/node_folder_deletion_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,22 @@ def test_deleting_node_folder(self):
wait_until(lambda: producer.produce_status.acked > 200000,
timeout_sec=120,
backoff_sec=0.5)

admin = Admin(self.redpanda)

# validate that the node with deleted folder is recognized as offline
def removed_node_is_reported_offline():
cluster_health = admin.get_cluster_health_overview()
return id in cluster_health['nodes_down']

wait_until(
removed_node_is_reported_offline,
timeout_sec=20,
backoff_sec=0.5,
err_msg=
f"Node {id} is expected to be marked as offline as it was replaced by new node"
)

# decommission a node that has been cleared
admin.decommission_broker(id)
waiter = NodeDecommissionWaiter(self.redpanda,
Expand Down

0 comments on commit 59e4e6e

Please sign in to comment.