Skip to content

Comments

[TE] Add peer endpoint deletion notification#1499

Open
chestnut-Q wants to merge 10 commits intokvcache-ai:mainfrom
chestnut-Q:delete-dual-endpoint
Open

[TE] Add peer endpoint deletion notification#1499
chestnut-Q wants to merge 10 commits intokvcache-ai:mainfrom
chestnut-Q:delete-dual-endpoint

Conversation

@chestnut-Q
Copy link
Collaborator

@chestnut-Q chestnut-Q commented Feb 5, 2026

Description

Motivation

When an RDMA endpoint is deleted from one node's cache (due to capacity limits or engine destruction), the remote peer's corresponding endpoint becomes stale. This leads to endpoint state inconsistency between nodes, potentially causing connection errors and unnecessary resource consumption on the peer side.

Solution

This PR implements a bidirectional endpoint deletion notification mechanism:

  1. When a local endpoint is destroyed, it sends a notification to the remote peer
  2. The remote peer receives the notification and deletes its corresponding endpoint

Design

Node A (endpoint destroyed)                      Node B (receives notification)
┌──────────────────────────────┐                 ┌──────────────────────────────┐
│                              │                 │                              │
│  RdmaEndPoint::deconstruct() │                 │  RdmaTransport               │
│         │                    │                 │    ::onDeleteEndpoint()      │
│         ▼                    │    TCP/Socket   │         │                    │
│  OnDeleteCallback            │  ─────────────► │         ▼                    │
│         │                    │                 │  EndpointStore               │
│         ▼                    │                 │    ::deleteEndpoint()        │
│  RdmaContext                 │                 │                              │
│    ::notifyPeerEndpointDeletion()              │                              │
│                              │                 │                              │
└──────────────────────────────┘                 └──────────────────────────────┘

Type of Change

  • Types
    • Bug fix
    • New feature
      • Transfer Engine
      • Mooncake Store
      • Mooncake EP
      • Integration
      • P2P Store
      • Python Wheel
    • Breaking change
    • CI/CD
    • Documentation update
    • Other

How Has This Been Tested?

Using transfer_engine_bench to test the endpoint deletion notification mechanism locally.

Checklist

  • I have performed a self-review of my own code.
  • I have formatted my own code using ./scripts/code_format.sh before submitting.
  • I have updated the documentation.
  • I have added tests to prove my changes are effective.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @chestnut-Q, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a critical feature for robust RDMA connection management by implementing a peer notification system for endpoint deletion. Previously, when an endpoint was destroyed or evicted locally, the remote peer might remain unaware, leading to stale connections and potential resource leaks. The new mechanism ensures that the peer is explicitly informed, allowing it to clean up its own resources and maintain a consistent view of active connections. This enhances the reliability and efficiency of the transfer engine by preventing dangling connections and improving resource utilization.

Highlights

  • New Endpoint Deletion Notification Protocol: Introduced a new DeleteEndpoint handshake request type to the HandShakeRequestType enum, enabling explicit notification between peers when an endpoint is being destroyed or evicted.
  • Endpoint Deletion Callback Mechanism: Implemented a callback mechanism (OnDeleteEndpointCallback) within RdmaEndPoint and EndpointStore to trigger peer notification upon endpoint destruction. This ensures that the remote side is aware of the local endpoint's removal.
  • Integration with Transfer Metadata and Transport Layers: Extended TransferMetadata and HandShakePlugin to support the new DeleteEndpoint message type, including serialization/deserialization logic for DeleteEndpointDesc and methods for sending/receiving these notifications.
  • Automatic Peer Notification on Endpoint Eviction: Configured RdmaContext to register a deletion callback with the EndpointStore, which automatically sends a DeleteEndpoint notification to the peer when an endpoint is evicted from the cache (e.g., due to cache policy).

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • mooncake-transfer-engine/include/common.h
    • Added DeleteEndpoint = 3 to HandShakeRequestType enum.
    • Updated readString to correctly parse the new DeleteEndpoint type.
    • Ensured file ends with a newline character.
  • mooncake-transfer-engine/include/transfer_metadata.h
    • Defined struct DeleteEndpointDesc to encapsulate information about deleted endpoints.
    • Added OnReceiveDeleteEndpoint callback type for handling incoming deletion requests.
    • Introduced registerDeleteEndpointCallback and sendDeleteEndpoint methods to TransferMetadata.
  • mooncake-transfer-engine/include/transfer_metadata_plugin.h
    • Added virtual methods registerOnDeleteEndpointCallBack and sendDeleteEndpoint to the HandShakePlugin interface.
    • Ensured file ends with a newline character.
  • mooncake-transfer-engine/include/transport/rdma_transport/endpoint_store.h
    • Included <functional> for callback types.
    • Defined OnDeleteEndpointCallback for endpoint deletion notifications.
    • Added a virtual destructor ~EndpointStore() to the base class.
    • Introduced setOnDeleteEndpointCallback virtual method to EndpointStore.
    • Added on_delete_endpoint_callback_ member and implemented setOnDeleteEndpointCallback in FIFOEndpointStore and SIEVEEndpointStore.
  • mooncake-transfer-engine/include/transport/rdma_transport/rdma_context.h
    • Added notifyPeerEndpointDeletion method to RdmaContext.
  • mooncake-transfer-engine/include/transport/rdma_transport/rdma_endpoint.h
    • Included <functional> for callback types.
    • Defined OnDeleteCallback for endpoint deletion.
    • Modified construct method to accept an OnDeleteCallback.
    • Added peerNicPath() and localNicPath() getter methods.
    • Added on_delete_callback_ member to store the deletion callback.
  • mooncake-transfer-engine/include/transport/rdma_transport/rdma_transport.h
    • Added DeleteEndpointDesc as a type alias.
    • Introduced onDeleteEndpoint and sendDeleteEndpoint methods to RdmaTransport.
  • mooncake-transfer-engine/src/transfer_metadata.cpp
    • Implemented TransferDeleteEndpointUtil for JSON encoding and decoding of DeleteEndpointDesc.
    • Provided implementations for TransferMetadata::registerDeleteEndpointCallback and TransferMetadata::sendDeleteEndpoint.
  • mooncake-transfer-engine/src/transfer_metadata_plugin.cpp
    • Implemented SocketHandShakePlugin::registerOnDeleteEndpointCallBack.
    • Modified SocketHandShakePlugin::handleClientConnection to process HandShakeRequestType::DeleteEndpoint messages.
    • Implemented SocketHandShakePlugin::sendDeleteEndpoint and doSendDeleteEndpoint for network communication.
    • Added on_delete_endpoint_callback_ member to SocketHandShakePlugin.
    • Ensured file ends with a newline character.
  • mooncake-transfer-engine/src/transport/rdma_transport/endpoint_store.cpp
    • Passed the on_delete_endpoint_callback_ to RdmaEndPoint::construct when inserting new endpoints in FIFOEndpointStore and SIEVEEndpointStore.
    • Updated evictEndpoint in both store types to log local and peer NIC paths upon eviction.
    • Added a comment in reclaimEndpoint clarifying that the deletion callback is invoked by the endpoint's destructor.
  • mooncake-transfer-engine/src/transport/rdma_transport/rdma_context.cpp
    • Set the onDeleteEndpointCallback for the endpoint_store_ to call notifyPeerEndpointDeletion.
    • Implemented notifyPeerEndpointDeletion to construct and send DeleteEndpointDesc messages to peers.
  • mooncake-transfer-engine/src/transport/rdma_transport/rdma_endpoint.cpp
    • Included <utility> for std::exchange.
    • Modified construct to store the provided OnDeleteCallback.
    • Updated deconstruct to invoke the stored on_delete_callback_ if it exists, before destroying QPs.
  • mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp
    • Registered RdmaTransport::onDeleteEndpoint as the callback for TransferMetadata::registerDeleteEndpointCallback.
    • Implemented RdmaTransport::onDeleteEndpoint to process incoming deletion notifications by finding the relevant RdmaContext and calling deleteEndpoint.
Activity
  • The author, chestnut-Q, has marked this as a new feature for the Transfer Engine.
  • The author has performed a self-review and formatted the code.
  • Tests have been added to prove the changes are effective.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism to notify a peer when an RDMA endpoint is destroyed, for instance, due to cache eviction, by adding a new DeleteEndpoint handshake message type. The peer then cleans up its corresponding endpoint resources upon receiving this notification. However, security vulnerabilities were identified: a potential out-of-bounds access in the readString utility function could lead to a crash when handling zero-length messages. Additionally, the new endpoint deletion notification mechanism lacks authentication, which could allow an unauthenticated network attacker to trigger endpoint eviction for arbitrary NIC paths, potentially disrupting communications. From a code quality perspective, consider refactoring duplicated code for sending different types of handshake messages to improve maintainability.


if (buffer[0] <= static_cast<char>(HandShakeRequestType::Notify)) {
if (buffer[0] <= static_cast<char>(HandShakeRequestType::DeleteEndpoint)) {
type = static_cast<HandShakeRequestType>(buffer[0]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

In the readString function, the length of the incoming message is read from the socket. While there is a check for the maximum length, there is no check to ensure that length is greater than 0 before accessing buffer[0]. If an attacker sends a message with length set to 0, buffer will be empty, and buffer[0] will result in an out-of-bounds access, potentially causing a crash (Denial of Service).

    if (length == 0) {
        LOG(ERROR) << "readString: zero length from socket";
        return {type, ""};
    }

    if (buffer[0] <= static_cast<char>(HandShakeRequestType::DeleteEndpoint)) {

Comment on lines 681 to 707
int RdmaTransport::onDeleteEndpoint(const DeleteEndpointDesc &peer_desc,
DeleteEndpointDesc &local_desc) {
// Find local NIC name from target_nic_path (which is our local NIC path)
auto local_nic_name = getNicNameFromNicPath(peer_desc.target_nic_path);
if (local_nic_name.empty()) {
LOG(WARNING) << "Invalid target NIC path in delete endpoint request: "
<< peer_desc.target_nic_path;
return ERR_INVALID_ARGUMENT;
}

// Find the corresponding RdmaContext
for (auto &context : context_list_) {
if (context->deviceName() == local_nic_name) {
// Delete the endpoint for the deleted peer
context->deleteEndpoint(peer_desc.deleted_nic_path);
LOG(INFO) << "onDeleteEndpoint: deleted endpoint, "
<< "deleted_nic_path=" << peer_desc.deleted_nic_path
<< ", local_nic_path=" << peer_desc.target_nic_path;
return 0;
}
}

LOG(WARNING) << "onDeleteEndpoint: target_nic_path not found, "
<< "target_nic_path=" << peer_desc.target_nic_path
<< ", peer_nic_path=" << peer_desc.deleted_nic_path;
return ERR_DEVICE_NOT_FOUND;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The onDeleteEndpoint function processes requests to delete endpoints from the cache. These requests are received over the network via the SocketHandShakePlugin listener, which does not perform any authentication or verification of the sender. An attacker can send a DeleteEndpoint request with a spoofed deleted_nic_path, causing the corresponding endpoint to be evicted from the local cache. This can be used to disrupt communications between legitimate nodes (Denial of Service).

Comment on lines 1084 to 1159
virtual int sendDeleteEndpoint(std::string ip_or_host_name,
uint16_t rpc_port, const Json::Value &local,
Json::Value &peer) {
struct addrinfo hints;
struct addrinfo *result, *rp;
memset(&hints, 0, sizeof(hints));
hints.ai_family = globalConfig().use_ipv6 ? AF_INET6 : AF_INET;
hints.ai_socktype = SOCK_STREAM;

char service[16];
sprintf(service, "%u", rpc_port);
if (getaddrinfo(ip_or_host_name.c_str(), service, &hints, &result)) {
PLOG(ERROR)
<< "SocketHandShakePlugin: failed to get IP address of peer "
"server "
<< ip_or_host_name << ":" << rpc_port
<< ", check DNS and /etc/hosts, or use IPv4 address instead";
return ERR_DNS;
}

int ret = 0;
for (rp = result; rp; rp = rp->ai_next) {
ret = doSendDeleteEndpoint(rp, local, peer);
if (ret == 0) {
freeaddrinfo(result);
return 0;
}
if (ret == ERR_MALFORMED_JSON) {
return ret;
}
}

freeaddrinfo(result);
return ret;
}

int doSendDeleteEndpoint(struct addrinfo *addr,
const Json::Value &local_delete_endpoint,
Json::Value &peer_delete_endpoint) {
int conn_fd = -1;
int ret = doConnect(addr, conn_fd);
if (ret) {
return ret;
}

ret = writeString(conn_fd, HandShakeRequestType::DeleteEndpoint,
Json::FastWriter{}.write(local_delete_endpoint));
if (ret) {
LOG(ERROR) << "SocketHandShakePlugin: failed to send delete "
"endpoint message: "
"malformed json format, check tcp connection";
close(conn_fd);
return ret;
}

auto [type, json_str] = readString(conn_fd);
if (type != HandShakeRequestType::DeleteEndpoint) {
LOG(ERROR)
<< "SocketHandShakePlugin: unexpected handshake message type";
close(conn_fd);
return ERR_SOCKET;
}

std::string errs;
if (!parseJsonString(json_str, peer_delete_endpoint, &errs)) {
LOG(ERROR)
<< "SocketHandShakePlugin: failed to receive delete endpoint "
"message, malformed json format: "
<< errs;
close(conn_fd);
return ERR_MALFORMED_JSON;
}

close(conn_fd);
return 0;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The new methods sendDeleteEndpoint and doSendDeleteEndpoint are almost identical to existing methods like send/doSend, sendNotify/doSendNotify, and exchangeMetadata/doSendMetadata. This code duplication can make the code harder to maintain.

Consider refactoring this logic into a pair of generic helper methods, sendRequest and doSendRequest, to handle the common logic of sending a request and receiving a response. The specific send* methods would then become simple wrappers around sendRequest.

Here's an example of how doSendRequest could look:

int doSendRequest(struct addrinfo *addr, HandShakeRequestType type, const char* request_name, const Json::Value &local, Json::Value &peer) {
    int conn_fd = -1;
    int ret = doConnect(addr, conn_fd);
    if (ret) {
        return ret;
    }

    ret = writeString(conn_fd, type, Json::FastWriter{}.write(local));
    if (ret) {
        LOG(ERROR) << "SocketHandShakePlugin: failed to send " << request_name
                      << " message: malformed json format, check tcp connection";
        close(conn_fd);
        return ret;
    }

    auto [resp_type, json_str] = readString(conn_fd);
    if (resp_type != type) {
        LOG(ERROR)
            << "SocketHandShakePlugin: unexpected handshake message type for " << request_name;
        close(conn_fd);
        return ERR_SOCKET;
    }

    std::string errs;
    if (!parseJsonString(json_str, peer, &errs)) {
        LOG(ERROR)
            << "SocketHandShakePlugin: failed to receive " << request_name
               << " message, malformed json format: "
            << errs;
        close(conn_fd);
        return ERR_MALFORMED_JSON;
    }

    close(conn_fd);
    return 0;
}

And doSendDeleteEndpoint would simplify to:

int doSendDeleteEndpoint(struct addrinfo *addr,
                         const Json::Value &local_delete_endpoint,
                         Json::Value &peer_delete_endpoint) {
    return doSendRequest(addr, HandShakeRequestType::DeleteEndpoint, "delete endpoint", local_delete_endpoint, peer_delete_endpoint);
}

A similar refactoring can be applied to the send* methods. This would significantly reduce code duplication and improve maintainability.

@chestnut-Q chestnut-Q changed the title [TE]: Delete dual endpoint when destroying endpoint [TE] Add peer endpoint deletion notification Feb 5, 2026
@chestnut-Q chestnut-Q marked this pull request as ready for review February 5, 2026 09:14
@ShangmingCai
Copy link
Collaborator

Sorry, we are making a new release that requires the CI resource, and I have cancelled the CI for this PR, please retrigger it later. Thx.

@alogfans
Copy link
Collaborator

LGTM. You can help me to confirm this. @stmatengss

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants