Skip to content

Commit

Permalink
c/status: validate responder node id when processing reply
Browse files Browse the repository at this point in the history
Added validation that checks if the node replying request is the one the
request was sent to. The validation is important as the receiving node
id might have changed while the RPC endpoint address stays the same.

Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Aug 9, 2024
1 parent 221a0b7 commit c514c9e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 14 deletions.
35 changes: 22 additions & 13 deletions src/v/cluster/node_status_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ ss::future<result<node_status>> node_status_backend::send_node_status_request(
})
.then(&rpc::get_ctx_data<node_status_reply>);

co_return process_reply(reply);
co_return process_reply(target, reply);
}

ss::future<> node_status_backend::maybe_create_client(
Expand All @@ -245,26 +245,18 @@ ss::future<> node_status_backend::maybe_create_client(
target, address, _rpc_tls_config, create_backoff_policy());
}

result<node_status>
node_status_backend::process_reply(result<node_status_reply> reply) {
result<node_status> node_status_backend::process_reply(
model::node_id target_node_id, result<node_status_reply> reply) {
vassert(ss::this_shard_id() == shard, "invoked on a wrong shard");

if (!reply.has_error()) {
_stats.rpcs_sent += 1;
auto& replier_metadata = reply.value().replier_metadata;

return node_status{
.node_id = replier_metadata.node_id,
.last_seen = rpc::clock_type::now()};
} else {
static constexpr auto rate_limit = std::chrono::seconds(1);
if (reply.has_error()) {
auto err = reply.error();
if (
err.category() == rpc::error_category()
&& static_cast<rpc::errc>(err.value())
== rpc::errc::client_request_timeout) {
_stats.rpcs_timed_out += 1;
}
static constexpr auto rate_limit = std::chrono::seconds(1);
static ss::logger::rate_limit rate(rate_limit);
clusterlog.log(
ss::log_level::debug,
Expand All @@ -273,6 +265,23 @@ node_status_backend::process_reply(result<node_status_reply> reply) {
err.message());
return err;
}

_stats.rpcs_sent += 1;
auto& replier_metadata = reply.value().replier_metadata;
if (replier_metadata.node_id != target_node_id) {
static ss::logger::rate_limit rate(rate_limit);
clusterlog.log(
ss::log_level::debug,
rate,
"Received reply from node with different node id. Expected: {}, "
"current: {}",
target_node_id,
replier_metadata.node_id);
return errc::invalid_target_node_id;
}

return node_status{
.node_id = replier_metadata.node_id, .last_seen = rpc::clock_type::now()};
}

ss::future<node_status_reply>
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/node_status_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class node_status_backend {
ss::future<> collect_and_store_updates();
ss::future<std::vector<node_status>> collect_updates_from_peers();

result<node_status> process_reply(result<node_status_reply>);
result<node_status> process_reply(
model::node_id target_node_id, result<node_status_reply> reply);
ss::future<node_status_reply> process_request(node_status_request);

ss::future<result<node_status>>
Expand Down

0 comments on commit c514c9e

Please sign in to comment.