From 221a0b76e7470e0b4b50d492e5b414f95b48906d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 9 Aug 2024 09:41:16 +0000 Subject: [PATCH 1/6] c/errc: introduced new error code indicating targeting invalid node MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduced an error code that indicates the node that the request was sent to is not the one that received it. Signed-off-by: Michał Maślanka --- src/v/cluster/errc.h | 3 +++ src/v/cluster/errors.cc | 2 ++ src/v/kafka/server/errors.h | 1 + 3 files changed, 6 insertions(+) diff --git a/src/v/cluster/errc.h b/src/v/cluster/errc.h index b8dfb4343125..9fa75a45f743 100644 --- a/src/v/cluster/errc.h +++ b/src/v/cluster/errc.h @@ -95,6 +95,7 @@ enum class errc : int16_t { data_migration_already_exists, data_migration_invalid_resources, resource_is_being_migrated, + invalid_target_node_id, }; std::ostream& operator<<(std::ostream& o, errc err); @@ -282,6 +283,8 @@ struct errc_category final : public std::error_category { case errc::resource_is_being_migrated: return "Requested operation can not be executed as the resource is " "undergoing data migration"; + case errc::invalid_target_node_id: + return "Request was intended for the node with different node id"; } return "cluster::errc::unknown"; } diff --git a/src/v/cluster/errors.cc b/src/v/cluster/errors.cc index b5bdf820589f..7380638a0013 100644 --- a/src/v/cluster/errors.cc +++ b/src/v/cluster/errors.cc @@ -178,6 +178,8 @@ std::ostream& operator<<(std::ostream& o, cluster::errc err) { return o << "cluster::errc::data_migration_invalid_resources"; case errc::resource_is_being_migrated: return o << "cluster::errc::resource_is_being_migrated"; + case errc::invalid_target_node_id: + return o << "cluster::errc::invalid_target_node_id"; } } } // namespace cluster diff --git a/src/v/kafka/server/errors.h b/src/v/kafka/server/errors.h index 538b51a150a4..36dc8cfecd2c 100644 --- a/src/v/kafka/server/errors.h +++ b/src/v/kafka/server/errors.h @@ -114,6 +114,7 @@ constexpr error_code map_topic_error_code(cluster::errc code) { case cluster::errc::data_migration_already_exists: case cluster::errc::data_migration_not_exists: case cluster::errc::data_migration_invalid_resources: + case cluster::errc::invalid_target_node_id: break; } return error_code::unknown_server_error; From c514c9e8d7c8e2eebdc338c36832c4f9ed5464c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 9 Aug 2024 09:42:39 +0000 Subject: [PATCH 2/6] c/status: validate responder node id when processing reply MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added validation that checks if the node replying request is the one the request was sent to. The validation is important as the receiving node id might have changed while the RPC endpoint address stays the same. Signed-off-by: Michał Maślanka --- src/v/cluster/node_status_backend.cc | 35 +++++++++++++++++----------- src/v/cluster/node_status_backend.h | 3 ++- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/v/cluster/node_status_backend.cc b/src/v/cluster/node_status_backend.cc index f83fd68cb52b..9ade7687e0ea 100644 --- a/src/v/cluster/node_status_backend.cc +++ b/src/v/cluster/node_status_backend.cc @@ -236,7 +236,7 @@ ss::future> node_status_backend::send_node_status_request( }) .then(&rpc::get_ctx_data); - co_return process_reply(reply); + co_return process_reply(target, reply); } ss::future<> node_status_backend::maybe_create_client( @@ -245,18 +245,11 @@ ss::future<> node_status_backend::maybe_create_client( target, address, _rpc_tls_config, create_backoff_policy()); } -result -node_status_backend::process_reply(result reply) { +result node_status_backend::process_reply( + model::node_id target_node_id, result reply) { vassert(ss::this_shard_id() == shard, "invoked on a wrong shard"); - - if (!reply.has_error()) { - _stats.rpcs_sent += 1; - auto& replier_metadata = reply.value().replier_metadata; - - return node_status{ - .node_id = replier_metadata.node_id, - .last_seen = rpc::clock_type::now()}; - } else { + static constexpr auto rate_limit = std::chrono::seconds(1); + if (reply.has_error()) { auto err = reply.error(); if ( err.category() == rpc::error_category() @@ -264,7 +257,6 @@ node_status_backend::process_reply(result reply) { == rpc::errc::client_request_timeout) { _stats.rpcs_timed_out += 1; } - static constexpr auto rate_limit = std::chrono::seconds(1); static ss::logger::rate_limit rate(rate_limit); clusterlog.log( ss::log_level::debug, @@ -273,6 +265,23 @@ node_status_backend::process_reply(result reply) { err.message()); return err; } + + _stats.rpcs_sent += 1; + auto& replier_metadata = reply.value().replier_metadata; + if (replier_metadata.node_id != target_node_id) { + static ss::logger::rate_limit rate(rate_limit); + clusterlog.log( + ss::log_level::debug, + rate, + "Received reply from node with different node id. Expected: {}, " + "current: {}", + target_node_id, + replier_metadata.node_id); + return errc::invalid_target_node_id; + } + + return node_status{ + .node_id = replier_metadata.node_id, .last_seen = rpc::clock_type::now()}; } ss::future diff --git a/src/v/cluster/node_status_backend.h b/src/v/cluster/node_status_backend.h index e42a96411b24..677b8c36dc11 100644 --- a/src/v/cluster/node_status_backend.h +++ b/src/v/cluster/node_status_backend.h @@ -76,7 +76,8 @@ class node_status_backend { ss::future<> collect_and_store_updates(); ss::future> collect_updates_from_peers(); - result process_reply(result); + result process_reply( + model::node_id target_node_id, result reply); ss::future process_request(node_status_request); ss::future> From 7886aec1c34c34af7664dcd26799032cce65aa4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 9 Aug 2024 09:45:04 +0000 Subject: [PATCH 3/6] c/health: added target node id to get_node_health_request MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added a field indicating what node the request was targeted to. If present the `target_node_id` will be validated when processing the request. Signed-off-by: Michał Maślanka --- src/v/cluster/health_monitor_types.cc | 4 ++-- src/v/cluster/health_monitor_types.h | 12 ++++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/v/cluster/health_monitor_types.cc b/src/v/cluster/health_monitor_types.cc index 727afb51ec95..384c5a402154 100644 --- a/src/v/cluster/health_monitor_types.cc +++ b/src/v/cluster/health_monitor_types.cc @@ -259,8 +259,8 @@ std::ostream& operator<<(std::ostream& o, const partitions_filter& filter) { return o; } -std::ostream& operator<<(std::ostream& o, const get_node_health_request&) { - fmt::print(o, "{{}}"); +std::ostream& operator<<(std::ostream& o, const get_node_health_request& r) { + fmt::print(o, "{{target_node_id: {}}}", r.get_target_node_id()); return o; } diff --git a/src/v/cluster/health_monitor_types.h b/src/v/cluster/health_monitor_types.h index 25e9a3f2807d..c256cee9955e 100644 --- a/src/v/cluster/health_monitor_types.h +++ b/src/v/cluster/health_monitor_types.h @@ -448,10 +448,13 @@ using force_refresh = ss::bool_class; class get_node_health_request : public serde::envelope< get_node_health_request, - serde::version<0>, + serde::version<1>, serde::compat_version<0>> { public: using rpc_adl_exempt = std::true_type; + get_node_health_request() = default; + explicit get_node_health_request(model::node_id target_node_id) + : _target_node_id(target_node_id) {} friend bool operator==(const get_node_health_request&, const get_node_health_request&) @@ -460,9 +463,14 @@ class get_node_health_request friend std::ostream& operator<<(std::ostream&, const get_node_health_request&); - auto serde_fields() { return std::tie(_filter); } + auto serde_fields() { return std::tie(_filter, _target_node_id); } + static constexpr model::node_id node_id_not_set{-1}; + + model::node_id get_target_node_id() const { return _target_node_id; } private: + // default value for backward compatibility + model::node_id _target_node_id = node_id_not_set; /** * This field is no longer used, as it never was. It was made private on * purpose From 90eafa83727cdd2f8ab3bdd7db4c65b2c3c50cbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 9 Aug 2024 09:46:52 +0000 Subject: [PATCH 4/6] c/health: validate target_node_id when collecting health report MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The health report is used to determine if a cluster node is online and available. When a node id changes but the RPC endpoint does not change the requester may incorrectly assume that the node with the previous node_id but the same endpoint is still operational. Added validation of the node that the request was sent to before collecting the health report. This way a sender will have correct information about the node availability as only the request targeted to the node with the correct node id will be replied with success. Fixes: CORE-5766 Signed-off-by: Michał Maślanka --- src/v/cluster/health_monitor_backend.cc | 4 ++-- src/v/cluster/service.cc | 16 +++++++++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/v/cluster/health_monitor_backend.cc b/src/v/cluster/health_monitor_backend.cc index 37182430ee18..90aace877444 100644 --- a/src/v/cluster/health_monitor_backend.cc +++ b/src/v/cluster/health_monitor_backend.cc @@ -360,9 +360,9 @@ health_monitor_backend::collect_remote_node_health(model::node_id id) { ss::this_shard_id(), id, max_metadata_age(), - [timeout](controller_client_protocol client) mutable { + [timeout, id](controller_client_protocol client) mutable { return client.collect_node_health_report( - get_node_health_request{}, rpc::client_opts(timeout)); + get_node_health_request(id), rpc::client_opts(timeout)); }) .then(&rpc::get_ctx_data) .then([this, id](result reply) { diff --git a/src/v/cluster/service.cc b/src/v/cluster/service.cc index 4db372515e85..41ce17b4bd2c 100644 --- a/src/v/cluster/service.cc +++ b/src/v/cluster/service.cc @@ -514,7 +514,21 @@ ss::future service::get_cluster_health_report( } ss::future -service::do_collect_node_health_report(get_node_health_request) { +service::do_collect_node_health_report(get_node_health_request req) { + // validate if the receiving node is the one that that the request is + // addressed to + if ( + req.get_target_node_id() != get_node_health_request::node_id_not_set + && req.get_target_node_id() != _controller->self()) { + vlog( + clusterlog.debug, + "Received a get_node_health request addressed to different node. " + "Requested node id: {}, current node id: {}", + req.get_target_node_id(), + _controller->self()); + co_return get_node_health_reply{.error = errc::invalid_target_node_id}; + } + auto res = co_await _hm_frontend.local().get_current_node_health(); if (res.has_error()) { co_return get_node_health_reply{ From 6a8f39079b676d4af6ffc808419213ec51b0bcda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 9 Aug 2024 09:51:02 +0000 Subject: [PATCH 5/6] tests: validate if node with old node is is reported as offline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The node folder deletion test checks if a node joins the cluster with the new node id after its data folder was deleted. Introduced a new validation checking if in this case the node with the old node_id is reported as offline Signed-off-by: Michał Maślanka --- tests/rptest/tests/node_folder_deletion_test.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/rptest/tests/node_folder_deletion_test.py b/tests/rptest/tests/node_folder_deletion_test.py index b7d15b7e28e8..fdf06f6efe80 100644 --- a/tests/rptest/tests/node_folder_deletion_test.py +++ b/tests/rptest/tests/node_folder_deletion_test.py @@ -94,7 +94,22 @@ def test_deleting_node_folder(self): wait_until(lambda: producer.produce_status.acked > 200000, timeout_sec=120, backoff_sec=0.5) + admin = Admin(self.redpanda) + + # validate that the node with deleted folder is recognized as offline + def removed_node_is_reported_offline(): + cluster_health = admin.get_cluster_health_overview() + return id in cluster_health['nodes_down'] + + wait_until( + removed_node_is_reported_offline, + timeout_sec=20, + backoff_sec=0.5, + err_msg= + f"Node {id} is expected to be marked as offline as it was replaced by new node" + ) + # decommission a node that has been cleared admin.decommission_broker(id) waiter = NodeDecommissionWaiter(self.redpanda, From 08de93db183bf9f596e19073b906a1afae602af0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 9 Aug 2024 09:56:00 +0000 Subject: [PATCH 6/6] c/health: added validation of received health report node_id MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added validation of the node_id of the reply received from the node. The report is not considered as valid if the reply node id doesn't match the id of node the report was sent to. Signed-off-by: Michał Maślanka --- src/v/cluster/health_monitor_backend.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/v/cluster/health_monitor_backend.cc b/src/v/cluster/health_monitor_backend.cc index 90aace877444..ac0a47dddf38 100644 --- a/src/v/cluster/health_monitor_backend.cc +++ b/src/v/cluster/health_monitor_backend.cc @@ -370,20 +370,23 @@ health_monitor_backend::collect_remote_node_health(model::node_id id) { }); } -result -map_reply_result(result reply) { +result map_reply_result( + model::node_id target_node_id, result reply) { if (!reply) { return {reply.error()}; } if (!reply.value().report.has_value()) { return {reply.value().error}; } + if (reply.value().report->id != target_node_id) { + return {errc::invalid_target_node_id}; + } return {std::move(*reply.value().report).to_in_memory()}; } result health_monitor_backend::process_node_reply( model::node_id id, result reply) { - auto res = map_reply_result(std::move(reply)); + auto res = map_reply_result(id, std::move(reply)); auto [status_it, _] = _status.try_emplace(id); if (!res) { vlog(