[TE] Add peer endpoint deletion notification#1499
[TE] Add peer endpoint deletion notification#1499chestnut-Q wants to merge 10 commits intokvcache-ai:mainfrom
Conversation
Summary of ChangesHello @chestnut-Q, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a critical feature for robust RDMA connection management by implementing a peer notification system for endpoint deletion. Previously, when an endpoint was destroyed or evicted locally, the remote peer might remain unaware, leading to stale connections and potential resource leaks. The new mechanism ensures that the peer is explicitly informed, allowing it to clean up its own resources and maintain a consistent view of active connections. This enhances the reliability and efficiency of the transfer engine by preventing dangling connections and improving resource utilization. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to notify a peer when an RDMA endpoint is destroyed, for instance, due to cache eviction, by adding a new DeleteEndpoint handshake message type. The peer then cleans up its corresponding endpoint resources upon receiving this notification. However, security vulnerabilities were identified: a potential out-of-bounds access in the readString utility function could lead to a crash when handling zero-length messages. Additionally, the new endpoint deletion notification mechanism lacks authentication, which could allow an unauthenticated network attacker to trigger endpoint eviction for arbitrary NIC paths, potentially disrupting communications. From a code quality perspective, consider refactoring duplicated code for sending different types of handshake messages to improve maintainability.
|
|
||
| if (buffer[0] <= static_cast<char>(HandShakeRequestType::Notify)) { | ||
| if (buffer[0] <= static_cast<char>(HandShakeRequestType::DeleteEndpoint)) { | ||
| type = static_cast<HandShakeRequestType>(buffer[0]); |
There was a problem hiding this comment.
In the readString function, the length of the incoming message is read from the socket. While there is a check for the maximum length, there is no check to ensure that length is greater than 0 before accessing buffer[0]. If an attacker sends a message with length set to 0, buffer will be empty, and buffer[0] will result in an out-of-bounds access, potentially causing a crash (Denial of Service).
if (length == 0) {
LOG(ERROR) << "readString: zero length from socket";
return {type, ""};
}
if (buffer[0] <= static_cast<char>(HandShakeRequestType::DeleteEndpoint)) {| int RdmaTransport::onDeleteEndpoint(const DeleteEndpointDesc &peer_desc, | ||
| DeleteEndpointDesc &local_desc) { | ||
| // Find local NIC name from target_nic_path (which is our local NIC path) | ||
| auto local_nic_name = getNicNameFromNicPath(peer_desc.target_nic_path); | ||
| if (local_nic_name.empty()) { | ||
| LOG(WARNING) << "Invalid target NIC path in delete endpoint request: " | ||
| << peer_desc.target_nic_path; | ||
| return ERR_INVALID_ARGUMENT; | ||
| } | ||
|
|
||
| // Find the corresponding RdmaContext | ||
| for (auto &context : context_list_) { | ||
| if (context->deviceName() == local_nic_name) { | ||
| // Delete the endpoint for the deleted peer | ||
| context->deleteEndpoint(peer_desc.deleted_nic_path); | ||
| LOG(INFO) << "onDeleteEndpoint: deleted endpoint, " | ||
| << "deleted_nic_path=" << peer_desc.deleted_nic_path | ||
| << ", local_nic_path=" << peer_desc.target_nic_path; | ||
| return 0; | ||
| } | ||
| } | ||
|
|
||
| LOG(WARNING) << "onDeleteEndpoint: target_nic_path not found, " | ||
| << "target_nic_path=" << peer_desc.target_nic_path | ||
| << ", peer_nic_path=" << peer_desc.deleted_nic_path; | ||
| return ERR_DEVICE_NOT_FOUND; | ||
| } |
There was a problem hiding this comment.
The onDeleteEndpoint function processes requests to delete endpoints from the cache. These requests are received over the network via the SocketHandShakePlugin listener, which does not perform any authentication or verification of the sender. An attacker can send a DeleteEndpoint request with a spoofed deleted_nic_path, causing the corresponding endpoint to be evicted from the local cache. This can be used to disrupt communications between legitimate nodes (Denial of Service).
| virtual int sendDeleteEndpoint(std::string ip_or_host_name, | ||
| uint16_t rpc_port, const Json::Value &local, | ||
| Json::Value &peer) { | ||
| struct addrinfo hints; | ||
| struct addrinfo *result, *rp; | ||
| memset(&hints, 0, sizeof(hints)); | ||
| hints.ai_family = globalConfig().use_ipv6 ? AF_INET6 : AF_INET; | ||
| hints.ai_socktype = SOCK_STREAM; | ||
|
|
||
| char service[16]; | ||
| sprintf(service, "%u", rpc_port); | ||
| if (getaddrinfo(ip_or_host_name.c_str(), service, &hints, &result)) { | ||
| PLOG(ERROR) | ||
| << "SocketHandShakePlugin: failed to get IP address of peer " | ||
| "server " | ||
| << ip_or_host_name << ":" << rpc_port | ||
| << ", check DNS and /etc/hosts, or use IPv4 address instead"; | ||
| return ERR_DNS; | ||
| } | ||
|
|
||
| int ret = 0; | ||
| for (rp = result; rp; rp = rp->ai_next) { | ||
| ret = doSendDeleteEndpoint(rp, local, peer); | ||
| if (ret == 0) { | ||
| freeaddrinfo(result); | ||
| return 0; | ||
| } | ||
| if (ret == ERR_MALFORMED_JSON) { | ||
| return ret; | ||
| } | ||
| } | ||
|
|
||
| freeaddrinfo(result); | ||
| return ret; | ||
| } | ||
|
|
||
| int doSendDeleteEndpoint(struct addrinfo *addr, | ||
| const Json::Value &local_delete_endpoint, | ||
| Json::Value &peer_delete_endpoint) { | ||
| int conn_fd = -1; | ||
| int ret = doConnect(addr, conn_fd); | ||
| if (ret) { | ||
| return ret; | ||
| } | ||
|
|
||
| ret = writeString(conn_fd, HandShakeRequestType::DeleteEndpoint, | ||
| Json::FastWriter{}.write(local_delete_endpoint)); | ||
| if (ret) { | ||
| LOG(ERROR) << "SocketHandShakePlugin: failed to send delete " | ||
| "endpoint message: " | ||
| "malformed json format, check tcp connection"; | ||
| close(conn_fd); | ||
| return ret; | ||
| } | ||
|
|
||
| auto [type, json_str] = readString(conn_fd); | ||
| if (type != HandShakeRequestType::DeleteEndpoint) { | ||
| LOG(ERROR) | ||
| << "SocketHandShakePlugin: unexpected handshake message type"; | ||
| close(conn_fd); | ||
| return ERR_SOCKET; | ||
| } | ||
|
|
||
| std::string errs; | ||
| if (!parseJsonString(json_str, peer_delete_endpoint, &errs)) { | ||
| LOG(ERROR) | ||
| << "SocketHandShakePlugin: failed to receive delete endpoint " | ||
| "message, malformed json format: " | ||
| << errs; | ||
| close(conn_fd); | ||
| return ERR_MALFORMED_JSON; | ||
| } | ||
|
|
||
| close(conn_fd); | ||
| return 0; | ||
| } |
There was a problem hiding this comment.
The new methods sendDeleteEndpoint and doSendDeleteEndpoint are almost identical to existing methods like send/doSend, sendNotify/doSendNotify, and exchangeMetadata/doSendMetadata. This code duplication can make the code harder to maintain.
Consider refactoring this logic into a pair of generic helper methods, sendRequest and doSendRequest, to handle the common logic of sending a request and receiving a response. The specific send* methods would then become simple wrappers around sendRequest.
Here's an example of how doSendRequest could look:
int doSendRequest(struct addrinfo *addr, HandShakeRequestType type, const char* request_name, const Json::Value &local, Json::Value &peer) {
int conn_fd = -1;
int ret = doConnect(addr, conn_fd);
if (ret) {
return ret;
}
ret = writeString(conn_fd, type, Json::FastWriter{}.write(local));
if (ret) {
LOG(ERROR) << "SocketHandShakePlugin: failed to send " << request_name
<< " message: malformed json format, check tcp connection";
close(conn_fd);
return ret;
}
auto [resp_type, json_str] = readString(conn_fd);
if (resp_type != type) {
LOG(ERROR)
<< "SocketHandShakePlugin: unexpected handshake message type for " << request_name;
close(conn_fd);
return ERR_SOCKET;
}
std::string errs;
if (!parseJsonString(json_str, peer, &errs)) {
LOG(ERROR)
<< "SocketHandShakePlugin: failed to receive " << request_name
<< " message, malformed json format: "
<< errs;
close(conn_fd);
return ERR_MALFORMED_JSON;
}
close(conn_fd);
return 0;
}And doSendDeleteEndpoint would simplify to:
int doSendDeleteEndpoint(struct addrinfo *addr,
const Json::Value &local_delete_endpoint,
Json::Value &peer_delete_endpoint) {
return doSendRequest(addr, HandShakeRequestType::DeleteEndpoint, "delete endpoint", local_delete_endpoint, peer_delete_endpoint);
}A similar refactoring can be applied to the send* methods. This would significantly reduce code duplication and improve maintainability.
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
|
Sorry, we are making a new release that requires the CI resource, and I have cancelled the CI for this PR, please retrigger it later. Thx. |
mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp
Outdated
Show resolved
Hide resolved
|
LGTM. You can help me to confirm this. @stmatengss |
Description
Motivation
When an RDMA endpoint is deleted from one node's cache (due to capacity limits or engine destruction), the remote peer's corresponding endpoint becomes stale. This leads to endpoint state inconsistency between nodes, potentially causing connection errors and unnecessary resource consumption on the peer side.
Solution
This PR implements a bidirectional endpoint deletion notification mechanism:
Design
Type of Change
How Has This Been Tested?
Using transfer_engine_bench to test the endpoint deletion notification mechanism locally.
Checklist
./scripts/code_format.shbefore submitting.