From cde06b64f1f487edfbae551de017f77cbff59514 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 9 Aug 2024 09:41:16 +0000 Subject: [PATCH 1/6] c/errc: introduced new error code indicating targeting invalid node MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduced an error code that indicates the node that the request was sent to is not the one that received it. Signed-off-by: Michał Maślanka (cherry picked from commit 221a0b76e7470e0b4b50d492e5b414f95b48906d) --- src/v/cluster/errc.h | 3 +++ src/v/cluster/errors.cc | 2 ++ src/v/kafka/server/errors.h | 1 + 3 files changed, 6 insertions(+) diff --git a/src/v/cluster/errc.h b/src/v/cluster/errc.h index adff2d58b2ab..daed6ed16e2d 100644 --- a/src/v/cluster/errc.h +++ b/src/v/cluster/errc.h @@ -89,6 +89,7 @@ enum class errc : int16_t { topic_invalid_partitions_decreased, producer_ids_vcluster_limit_exceeded, validation_of_recovery_topic_failed, + invalid_target_node_id, }; std::ostream& operator<<(std::ostream& o, errc err); @@ -262,6 +263,8 @@ struct errc_category final : public std::error_category { return "To many vclusters registered in producer state cache"; case errc::validation_of_recovery_topic_failed: return "Validation of recovery topic failed"; + case errc::invalid_target_node_id: + return "Request was intended for the node with different node id"; } return "cluster::errc::unknown"; } diff --git a/src/v/cluster/errors.cc b/src/v/cluster/errors.cc index 5f4b47b83e76..d40b047d6c79 100644 --- a/src/v/cluster/errors.cc +++ b/src/v/cluster/errors.cc @@ -166,6 +166,8 @@ std::ostream& operator<<(std::ostream& o, cluster::errc err) { return o << "cluster::errc::producer_ids_vcluster_limit_exceeded"; case errc::validation_of_recovery_topic_failed: return o << "cluster::errc::validation_of_recovery_topic_failed"; + case errc::invalid_target_node_id: + return o << "cluster::errc::invalid_target_node_id"; } } } // namespace cluster diff --git a/src/v/kafka/server/errors.h b/src/v/kafka/server/errors.h index 361103d93749..7766c6204c31 100644 --- a/src/v/kafka/server/errors.h +++ b/src/v/kafka/server/errors.h @@ -107,6 +107,7 @@ constexpr error_code map_topic_error_code(cluster::errc code) { case cluster::errc::waiting_for_shard_placement_update: case cluster::errc::producer_ids_vcluster_limit_exceeded: case cluster::errc::validation_of_recovery_topic_failed: + case cluster::errc::invalid_target_node_id: break; } return error_code::unknown_server_error; From e7550085c807cdfdabb3fee42b1f7c217d531b30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 9 Aug 2024 09:42:39 +0000 Subject: [PATCH 2/6] c/status: validate responder node id when processing reply MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added validation that checks if the node replying request is the one the request was sent to. The validation is important as the receiving node id might have changed while the RPC endpoint address stays the same. Signed-off-by: Michał Maślanka (cherry picked from commit c514c9e8d7c8e2eebdc338c36832c4f9ed5464c0) --- src/v/cluster/node_status_backend.cc | 35 +++++++++++++++++----------- src/v/cluster/node_status_backend.h | 3 ++- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/v/cluster/node_status_backend.cc b/src/v/cluster/node_status_backend.cc index f83fd68cb52b..9ade7687e0ea 100644 --- a/src/v/cluster/node_status_backend.cc +++ b/src/v/cluster/node_status_backend.cc @@ -236,7 +236,7 @@ ss::future> node_status_backend::send_node_status_request( }) .then(&rpc::get_ctx_data); - co_return process_reply(reply); + co_return process_reply(target, reply); } ss::future<> node_status_backend::maybe_create_client( @@ -245,18 +245,11 @@ ss::future<> node_status_backend::maybe_create_client( target, address, _rpc_tls_config, create_backoff_policy()); } -result -node_status_backend::process_reply(result reply) { +result node_status_backend::process_reply( + model::node_id target_node_id, result reply) { vassert(ss::this_shard_id() == shard, "invoked on a wrong shard"); - - if (!reply.has_error()) { - _stats.rpcs_sent += 1; - auto& replier_metadata = reply.value().replier_metadata; - - return node_status{ - .node_id = replier_metadata.node_id, - .last_seen = rpc::clock_type::now()}; - } else { + static constexpr auto rate_limit = std::chrono::seconds(1); + if (reply.has_error()) { auto err = reply.error(); if ( err.category() == rpc::error_category() @@ -264,7 +257,6 @@ node_status_backend::process_reply(result reply) { == rpc::errc::client_request_timeout) { _stats.rpcs_timed_out += 1; } - static constexpr auto rate_limit = std::chrono::seconds(1); static ss::logger::rate_limit rate(rate_limit); clusterlog.log( ss::log_level::debug, @@ -273,6 +265,23 @@ node_status_backend::process_reply(result reply) { err.message()); return err; } + + _stats.rpcs_sent += 1; + auto& replier_metadata = reply.value().replier_metadata; + if (replier_metadata.node_id != target_node_id) { + static ss::logger::rate_limit rate(rate_limit); + clusterlog.log( + ss::log_level::debug, + rate, + "Received reply from node with different node id. Expected: {}, " + "current: {}", + target_node_id, + replier_metadata.node_id); + return errc::invalid_target_node_id; + } + + return node_status{ + .node_id = replier_metadata.node_id, .last_seen = rpc::clock_type::now()}; } ss::future diff --git a/src/v/cluster/node_status_backend.h b/src/v/cluster/node_status_backend.h index e42a96411b24..677b8c36dc11 100644 --- a/src/v/cluster/node_status_backend.h +++ b/src/v/cluster/node_status_backend.h @@ -76,7 +76,8 @@ class node_status_backend { ss::future<> collect_and_store_updates(); ss::future> collect_updates_from_peers(); - result process_reply(result); + result process_reply( + model::node_id target_node_id, result reply); ss::future process_request(node_status_request); ss::future> From f20ed7ab372f4e884c3f9cea6a8cc2a16ff0f4ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 9 Aug 2024 09:45:04 +0000 Subject: [PATCH 3/6] c/health: added target node id to get_node_health_request MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added a field indicating what node the request was targeted to. If present the `target_node_id` will be validated when processing the request. Signed-off-by: Michał Maślanka (cherry picked from commit 7886aec1c34c34af7664dcd26799032cce65aa4a) --- src/v/cluster/health_monitor_types.cc | 4 ++-- src/v/cluster/health_monitor_types.h | 12 ++++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/v/cluster/health_monitor_types.cc b/src/v/cluster/health_monitor_types.cc index cc5d5be64b28..37a696ee42db 100644 --- a/src/v/cluster/health_monitor_types.cc +++ b/src/v/cluster/health_monitor_types.cc @@ -267,8 +267,8 @@ std::ostream& operator<<(std::ostream& o, const partitions_filter& filter) { return o; } -std::ostream& operator<<(std::ostream& o, const get_node_health_request&) { - fmt::print(o, "{{}}"); +std::ostream& operator<<(std::ostream& o, const get_node_health_request& r) { + fmt::print(o, "{{target_node_id: {}}}", r.get_target_node_id()); return o; } diff --git a/src/v/cluster/health_monitor_types.h b/src/v/cluster/health_monitor_types.h index afc4613088d3..5a3c9ebef88e 100644 --- a/src/v/cluster/health_monitor_types.h +++ b/src/v/cluster/health_monitor_types.h @@ -430,10 +430,13 @@ using force_refresh = ss::bool_class; class get_node_health_request : public serde::envelope< get_node_health_request, - serde::version<0>, + serde::version<1>, serde::compat_version<0>> { public: using rpc_adl_exempt = std::true_type; + get_node_health_request() = default; + explicit get_node_health_request(model::node_id target_node_id) + : _target_node_id(target_node_id) {} friend bool operator==(const get_node_health_request&, const get_node_health_request&) @@ -442,9 +445,14 @@ class get_node_health_request friend std::ostream& operator<<(std::ostream&, const get_node_health_request&); - auto serde_fields() { return std::tie(_filter); } + auto serde_fields() { return std::tie(_filter, _target_node_id); } + static constexpr model::node_id node_id_not_set{-1}; + + model::node_id get_target_node_id() const { return _target_node_id; } private: + // default value for backward compatibility + model::node_id _target_node_id = node_id_not_set; /** * This field is no longer used, as it never was. It was made private on * purpose From 08f328c3ac41bbed4b607fb50a12ae25d73a53db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 9 Aug 2024 09:46:52 +0000 Subject: [PATCH 4/6] c/health: validate target_node_id when collecting health report MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The health report is used to determine if a cluster node is online and available. When a node id changes but the RPC endpoint does not change the requester may incorrectly assume that the node with the previous node_id but the same endpoint is still operational. Added validation of the node that the request was sent to before collecting the health report. This way a sender will have correct information about the node availability as only the request targeted to the node with the correct node id will be replied with success. Fixes: CORE-5766 Signed-off-by: Michał Maślanka (cherry picked from commit 90eafa83727cdd2f8ab3bdd7db4c65b2c3c50cbc) --- src/v/cluster/health_monitor_backend.cc | 4 ++-- src/v/cluster/service.cc | 16 +++++++++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/v/cluster/health_monitor_backend.cc b/src/v/cluster/health_monitor_backend.cc index 26305769192b..564f6ab39221 100644 --- a/src/v/cluster/health_monitor_backend.cc +++ b/src/v/cluster/health_monitor_backend.cc @@ -366,9 +366,9 @@ health_monitor_backend::collect_remote_node_health(model::node_id id) { ss::this_shard_id(), id, max_metadata_age(), - [timeout](controller_client_protocol client) mutable { + [timeout, id](controller_client_protocol client) mutable { return client.collect_node_health_report( - get_node_health_request{}, rpc::client_opts(timeout)); + get_node_health_request(id), rpc::client_opts(timeout)); }) .then(&rpc::get_ctx_data) .then([this, id](result reply) { diff --git a/src/v/cluster/service.cc b/src/v/cluster/service.cc index 64a6213d2abf..02f80726af1b 100644 --- a/src/v/cluster/service.cc +++ b/src/v/cluster/service.cc @@ -510,7 +510,21 @@ ss::future service::get_cluster_health_report( } ss::future -service::do_collect_node_health_report(get_node_health_request) { +service::do_collect_node_health_report(get_node_health_request req) { + // validate if the receiving node is the one that that the request is + // addressed to + if ( + req.get_target_node_id() != get_node_health_request::node_id_not_set + && req.get_target_node_id() != _controller->self()) { + vlog( + clusterlog.debug, + "Received a get_node_health request addressed to different node. " + "Requested node id: {}, current node id: {}", + req.get_target_node_id(), + _controller->self()); + co_return get_node_health_reply{.error = errc::invalid_target_node_id}; + } + auto res = co_await _hm_frontend.local().get_current_node_health(); if (res.has_error()) { co_return get_node_health_reply{ From a010a744ad1445050f51012580635bc8b21148d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 9 Aug 2024 09:51:02 +0000 Subject: [PATCH 5/6] tests: validate if node with old node is is reported as offline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The node folder deletion test checks if a node joins the cluster with the new node id after its data folder was deleted. Introduced a new validation checking if in this case the node with the old node_id is reported as offline Signed-off-by: Michał Maślanka (cherry picked from commit 6a8f39079b676d4af6ffc808419213ec51b0bcda) --- tests/rptest/tests/node_folder_deletion_test.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/rptest/tests/node_folder_deletion_test.py b/tests/rptest/tests/node_folder_deletion_test.py index b7d15b7e28e8..fdf06f6efe80 100644 --- a/tests/rptest/tests/node_folder_deletion_test.py +++ b/tests/rptest/tests/node_folder_deletion_test.py @@ -94,7 +94,22 @@ def test_deleting_node_folder(self): wait_until(lambda: producer.produce_status.acked > 200000, timeout_sec=120, backoff_sec=0.5) + admin = Admin(self.redpanda) + + # validate that the node with deleted folder is recognized as offline + def removed_node_is_reported_offline(): + cluster_health = admin.get_cluster_health_overview() + return id in cluster_health['nodes_down'] + + wait_until( + removed_node_is_reported_offline, + timeout_sec=20, + backoff_sec=0.5, + err_msg= + f"Node {id} is expected to be marked as offline as it was replaced by new node" + ) + # decommission a node that has been cleared admin.decommission_broker(id) waiter = NodeDecommissionWaiter(self.redpanda, From cc9b70a117625ac5c4917992d6c5d54cc5ca1c33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Fri, 9 Aug 2024 09:56:00 +0000 Subject: [PATCH 6/6] c/health: added validation of received health report node_id MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added validation of the node_id of the reply received from the node. The report is not considered as valid if the reply node id doesn't match the id of node the report was sent to. Signed-off-by: Michał Maślanka (cherry picked from commit 08de93db183bf9f596e19073b906a1afae602af0) --- src/v/cluster/health_monitor_backend.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/v/cluster/health_monitor_backend.cc b/src/v/cluster/health_monitor_backend.cc index 564f6ab39221..12858b2c2808 100644 --- a/src/v/cluster/health_monitor_backend.cc +++ b/src/v/cluster/health_monitor_backend.cc @@ -376,20 +376,23 @@ health_monitor_backend::collect_remote_node_health(model::node_id id) { }); } -result -map_reply_result(result reply) { +result map_reply_result( + model::node_id target_node_id, result reply) { if (!reply) { return {reply.error()}; } if (!reply.value().report.has_value()) { return {reply.value().error}; } + if (reply.value().report->id != target_node_id) { + return {errc::invalid_target_node_id}; + } return {std::move(*reply.value().report)}; } result health_monitor_backend::process_node_reply( model::node_id id, result reply) { - auto res = map_reply_result(std::move(reply)); + auto res = map_reply_result(id, std::move(reply)); auto [status_it, _] = _status.try_emplace(id); if (!res) { vlog(