Skip to content

Comments

[PG] Implement circular buffer for efficient send/recv#1503

Closed
yuechen-sys wants to merge 4 commits intokvcache-ai:mainfrom
yuechen-sys:pr/p2p-improve
Closed

[PG] Implement circular buffer for efficient send/recv#1503
yuechen-sys wants to merge 4 commits intokvcache-ai:mainfrom
yuechen-sys:pr/p2p-improve

Conversation

@yuechen-sys
Copy link
Collaborator

@yuechen-sys yuechen-sys commented Feb 5, 2026

Description

This PR introduces a series of improvements to send/recv communication in the Mooncake backend. These changes address the performance issues reported in Issue #1421.

Changes

  • Fix inefficient P2P transfers for large tensors by introducing a ring buffer, enabling chunked writes and eliminating extra TCP-based synchronization.
  • Decouple the control and data planes: send worker becomes copy-only. control worker is responsible for submitting RDMA writes, polling for completion, and updating the remote head pointer.
  • Temporally specific buffer for send/recv.

Future Plans

  • Current Intra-node P2P performance is bounded by RDMA-only Mooncake TE. Switching to Mooncake TENT is expected to close the performance gap between Mooncake PG and NCCL.
  • Support for efficient batch_isend_irecv is currently a work in progress.

Type of Change

  • Types
    • Bug fix
    • New feature
      • Transfer Engine
      • Mooncake Store
      • Mooncake EP
      • Integration
      • P2P Store
      • Python Wheel
    • Breaking change
    • CI/CD
    • Documentation update
    • Other

How Has This Been Tested?

Send/receive performance is measured using a thorough in-house benchmark similar to nccl-test. The performance of small message is imprecise, since the lack of barrier in mooncake backend.

bandwidth latency

Checklist

  • I have performed a self-review of my own code.
  • I have formatted my own code using ./scripts/code_format.sh before submitting.
  • I have updated the documentation.
  • I have added tests to prove my changes are effective.

yuechen-sys and others added 2 commits February 5, 2026 18:43
- Support large tensors via ring buffer
- Reduce extra sync to improve P2P perf
- Temporally make P2P buffers independent
- Introduce control worker to submit transfers and update remote head
- Make send worker only copy data into slots
- Format code
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @yuechen-sys, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request delivers a substantial upgrade to the Mooncake backend's point-to-point communication capabilities, specifically targeting performance bottlenecks in large tensor transfers. By implementing a sophisticated circular buffer architecture, the system can now handle data in chunks, bypassing previous synchronization overheads. A key architectural change is the decoupling of data transfer from control logic, assigning a dedicated worker to manage RDMA operations and remote state updates. This refactoring directly addresses performance issues and significantly improves the efficiency of distributed operations within the Mooncake framework.

Highlights

  • Circular Buffer Implementation: Introduced a circular buffer mechanism for P2P send/receive operations to enable chunked data transfers and eliminate inefficient TCP-based synchronization for large tensors.
  • Decoupled Control and Data Planes: The P2P communication logic has been refactored to separate the data copying (send worker) from the control plane (new control worker), which is now responsible for submitting RDMA writes, polling for completion, and updating remote head pointers.
  • Performance Improvement: Significant performance gains have been observed in send/receive operations for Mooncake PG, particularly for larger tensor sizes, as evidenced by the provided bandwidth and latency benchmarks.
  • Dedicated P2P Buffers and Control Structures: New dedicated P2P send/receive buffers and control slot structures (P2PControlSlot) have been added and integrated into the backend's memory management and synchronization.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • mooncake-pg/include/mooncake_backend.h
    • Added P2PSendOpTracker and P2PSendTask structs for managing P2P send operations and their completion.
    • Declared a new p2PCtrlWorkerThread() function.
    • Introduced new member variables for P2P send/receive buffers (p2p_send_buffer_, p2p_recv_buffer_) and P2P control slots (p2p_ctrl_send_region_, p2p_ctrl_recv_region_).
    • Added member variables for managing the P2P send task queue, mutex, condition variable, sequence numbers, next pointers, local head pointers, and a control worker thread.
  • mooncake-pg/include/mooncake_worker.cuh
    • Modified P2P buffer constants: kP2PNumSlots reduced to 8, kP2PSlotSize redefined based on kP2PBufferSize.
    • Introduced P2PControlSlot struct with alignas(64) for cache-aligned head and tail pointers in the circular buffer.
    • Added p2p_send_buffer, p2p_recv_buffer, p2p_ctrl_send, and p2p_ctrl_recv to SegmentInfo to store addresses of new P2P buffers and control regions.
    • Removed p2pSendLowestInFlight and p2pRecvLowestInFlight from TransferGroupMeta.
  • mooncake-pg/src/mooncake_backend.cpp
    • Defined new constants (kP2PBytesPerRank, kP2PNumSlotsPerRank, kP2PTotalBufferSize) and added static assertions for P2P buffer configuration.
    • Allocated and registered p2p_send_buffer_ and p2p_recv_buffer_ (both CPU and CUDA) in the MooncakeBackend constructor.
    • Initialized and registered p2p_ctrl_send_region_ and p2p_ctrl_recv_region_.
    • Updated rank_info to include the new P2P buffer and control region addresses.
    • Refactored the destructor to call a new shutdown() method for proper resource deallocation.
    • Removed old P2P control key generation functions (makeP2PCtrlKey, makeP2PDoneKey, makeP2PSlotKey).
    • Removed previous size and slot checks from send() and recv() methods, as these are now handled by the circular buffer logic.
    • Added engine_.freeBatchID(batchID); in connectionPoller after processing a batch.
    • Started the new p2pCtrlWorkerThread_ in startP2PWorker() and ensured its proper shutdown in stopP2PWorker().
    • Removed direct completion signaling from p2PSendWorkerThread(), as completion is now managed by P2PSendOpTracker within the control worker.
    • Implemented p2PCtrlWorkerThread() to manage the submission of RDMA write tasks for data chunks and update remote head pointers, utilizing a task queue for synchronization.
    • Refactored processSendOp() to copy data in chunks to the local P2P send buffer and enqueue P2PSendTask objects for the control worker, incorporating PAUSE() for slot availability.
    • Refactored processRecvOp() to wait for remote head updates, copy received data chunks from the local P2P receive buffer, and update the local tail pointer, which is then written to the remote control region, replacing old store-based synchronization.
Activity
  • The pull request was authored by yuechen-sys.
  • The author provided a detailed description outlining the purpose, specific changes, and future plans for the P2P communication improvements.
  • Extensive performance benchmarks (bandwidth and latency tables, and graphs) were included, demonstrating significant improvements for Mooncake PG compared to its original state, especially for larger message sizes.
  • The author confirmed self-review and code formatting according to project standards.
  • The changes introduce a new p2PCtrlWorkerThread to manage the control plane of P2P transfers, indicating a significant architectural shift.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant architectural change to the P2P communication in the Mooncake backend, replacing the previous mechanism with a more efficient circular buffer implementation. This is a great improvement that yields substantial performance gains as shown in the benchmarks. The decoupling of the data and control planes is well-executed. My review focuses on improving the robustness and maintainability of the new implementation. I've identified some areas for improvement in error handling, where failures could lead to crashes instead of being gracefully propagated. I've also pointed out a few busy-wait loops that could be optimized to reduce CPU consumption, and some code duplication that could be refactored for better maintainability.

Comment on lines +1198 to +1201
} else if (status.s == TransferStatusEnum::FAILED) {
meta_.engine->freeBatchID(task.batchID);
TORCH_CHECK(false, "P2P send transfer failed.");
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

A failed RDMA transfer causes a TORCH_CHECK failure, which will terminate the process. This is inconsistent with the error handling in other parts of the code (like p2PSendWorkerThread) which propagate errors to the Work object. Consider propagating the error via the P2PSendOpTracker instead of crashing. This would make the error handling more robust and consistent.

Example of how to propagate the error:

} else if (status.s == TransferStatusEnum::FAILED) {
    meta_.engine->freeBatchID(task.batchID);
    bool expected_err = false;
    if (task.tracker->errorSet.compare_exchange_strong(expected_err, true, std::memory_order_acq_rel)) {
        *task.tracker->errorMsg = "P2P send transfer failed.";
        task.tracker->completed->store(true, std::memory_order_release);
    }
    outstanding_tasks[i] = std::move(outstanding_tasks.back());
    outstanding_tasks.pop_back();
    didWork = true;
} else {

Comment on lines 1212 to 1298
void MooncakeBackend::processSendOp(const P2POp& op) {
auto tensor = op.tensor;
int dstRank = op.peerRank;

uint64_t sendAddr = sendAddrBase + baseSlot * kP2PSlotSize;
void* sendBuf = reinterpret_cast<void*>(sendAddr);
const uint64_t numBytes =
tensor.numel() * static_cast<uint64_t>(tensor.element_size());

if (isCpu_) {
std::memcpy(sendBuf, tensor.data_ptr(), numBytes);
} else {
auto stream = at::cuda::getCurrentCUDAStream(tensor.device().index());
auto err = cudaMemcpyAsync(sendBuf, tensor.data_ptr(), numBytes,
cudaMemcpyDeviceToDevice, stream);
TORCH_CHECK(
!err, "P2P send cudaMemcpyAsync failed: ", cudaGetErrorString(err));
cudaStreamSynchronize(stream);
if (numBytes == 0) {
op.completed->store(true, std::memory_order_release);
return;
}
uint64_t remoteRecvAddrBase = meta_.segmentInfos[dstRank].recv_buffer[0];

uint64_t remoteRecvAddr = remoteRecvAddrBase + baseSlot * kP2PSlotSize;
std::vector<TransferRequest> entries;
entries.push_back(TransferRequest{
.opcode = TransferRequest::WRITE,
.source = sendBuf,
.target_id = meta_.segmentIDs[dstRank],
.target_offset = remoteRecvAddr,
.length = numBytes,
});
auto batchID = meta_.engine->allocateBatchID(entries.size());
meta_.engine->submitTransfer(batchID, entries);

TransferStatus status;
while (true) {
meta_.engine->getTransferStatus(batchID, 0, status);
if (status.s == TransferStatusEnum::COMPLETED) {
break;

const uint32_t capacity = static_cast<uint32_t>(kP2PNumSlotsPerRank);
const uint32_t numSlotsNeeded =
static_cast<uint32_t>((numBytes + kP2PSlotSize - 1) / kP2PSlotSize);

uint32_t curSlotOffset = 0;
uint64_t bytesSent = 0;

auto* local_ctrl = &p2p_ctrl_send_region_[dstRank];
auto& remote_tail = local_ctrl->tail;
uint32_t head = p2pSendLocalHead_[dstRank];

const uint64_t sendAddrBase =
meta_.segmentInfos[rank_].p2p_send_buffer +
static_cast<uint64_t>(dstRank) * kP2PBytesPerRank;
const uint64_t remoteRecvAddrBase =
meta_.segmentInfos[dstRank].p2p_recv_buffer +
static_cast<uint64_t>(rank_) * kP2PBytesPerRank;

const auto* tensor_ptr = static_cast<const uint8_t*>(tensor.data_ptr());

auto tracker = std::make_shared<P2PSendOpTracker>(
static_cast<uint32_t>(numSlotsNeeded), op.completed, op.errorMsg);

while (curSlotOffset < numSlotsNeeded) {
// Send Phase1: Wait for available slot
while (((head + 1) % capacity) == remote_tail) {
PAUSE();
}

// Send Phase2: Data Copy
const size_t chunkBytes =
std::min(static_cast<size_t>(numBytes - bytesSent),
static_cast<size_t>(kP2PSlotSize));

const uint64_t sendAddr =
sendAddrBase + static_cast<uint64_t>(head) * kP2PSlotSize;
void* sendBuf = reinterpret_cast<void*>(sendAddr);

if (isCpu_) {
std::memcpy(sendBuf, tensor_ptr + bytesSent, chunkBytes);
} else {
auto stream =
at::cuda::getCurrentCUDAStream(tensor.device().index());
auto err =
cudaMemcpyAsync(sendBuf, tensor_ptr + bytesSent, chunkBytes,
cudaMemcpyDeviceToDevice, stream);
TORCH_CHECK(!err, "P2P send cudaMemcpyAsync failed: ",
cudaGetErrorString(err));
cudaStreamSynchronize(stream);
}
TORCH_CHECK(status.s != TransferStatusEnum::FAILED,
"P2P send transfer failed.");
}

meta_.store->set(ctrlKey,
c10::str(baseSlot, "_", allocatedSlots, "_", numBytes));
// Send Phase3: enqueue RDMA write task for ctrl worker
{
P2PSendTask task{
.peerRank = dstRank,
.peerSeq = p2pSendTaskSeq_[dstRank]++,
.source = sendBuf,
.target_id = meta_.segmentIDs[dstRank],
.target_offset = remoteRecvAddrBase +
static_cast<uint64_t>(head) * kP2PSlotSize,
.length = chunkBytes,
.tracker = tracker,
};
std::lock_guard<std::mutex> lock(p2pSendTaskQueueMutex_);
p2pSendTaskQueue_.push_back(std::move(task));
}
p2pSendTaskQueueCv_.notify_one();

// Send Phase4: Advance local head (ctrl head updates on completion)
head = (head + 1) % capacity;
p2pSendLocalHead_[dstRank] = head;
bytesSent += chunkBytes;
curSlotOffset += 1;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If an exception occurs within processSendOp (e.g., from cudaMemcpyAsync), it's caught in p2PSendWorkerThread, which marks the operation as complete. However, the P2PSendOpTracker is not notified of the error because the errorSet flag is not set. The p2PCtrlWorkerThread might still be processing tasks for this operation and could incorrectly try to mark it as complete again later. To fix this potential race condition, wrap the main loop of processSendOp in a try-catch block to set the errorSet flag on the tracker before re-throwing the exception.

Comment on lines 793 to 811
if (p2p_send_buffer_) {
engine_.unregisterLocalMemory(p2p_send_buffer_);
if (isCpu_) {
free(p2p_send_buffer_);
} else {
cudaFree(p2p_send_buffer_);
}
p2p_send_buffer_ = nullptr;
}

if (p2p_recv_buffer_) {
engine_.unregisterLocalMemory(p2p_recv_buffer_);
if (isCpu_) {
free(p2p_recv_buffer_);
} else {
cudaFree(p2p_recv_buffer_);
}
p2p_recv_buffer_ = nullptr;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The resource cleanup logic for p2p_send_buffer_ and p2p_recv_buffer_ is duplicated. This pattern is also used for p2p_ctrl_send_region_, p2p_ctrl_recv_region_, and other buffers in this function. Consider creating helper functions to encapsulate this logic, which would improve readability and maintainability by reducing code repetition.

Comment on lines 1176 to 1183
while (true) {
meta_.engine->getTransferStatus(headBatchID, 0, headStatus);
if (headStatus.s == TransferStatusEnum::COMPLETED) {
break;
}
} catch (const std::exception&) {
// Invalid format, keep waiting.
TORCH_CHECK(headStatus.s != TransferStatusEnum::FAILED,
"P2P ctrl head update failed.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This while(true) loop for polling transfer status is a busy-wait that can consume significant CPU while waiting for the RDMA operation to complete. To avoid this, consider adding a short sleep (e.g., std::this_thread::sleep_for(std::chrono::microseconds(1))) inside the loop to yield the CPU. This pattern is used in other parts of the codebase.

                while (true) {
                    meta_.engine->getTransferStatus(headBatchID, 0, headStatus);
                    if (headStatus.s == TransferStatusEnum::COMPLETED) {
                        break;
                    }
                    TORCH_CHECK(headStatus.s != TransferStatusEnum::FAILED,
                                "P2P ctrl head update failed.");
                    std::this_thread::sleep_for(std::chrono::microseconds(1));
                }

Comment on lines 1373 to 1380
while (true) {
meta_.engine->getTransferStatus(batchID, 0, status);
if (status.s == TransferStatusEnum::COMPLETED) {
break;
}
TORCH_CHECK(status.s != TransferStatusEnum::FAILED,
"P2P ctrl tail update failed.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This while(true) loop for polling transfer status is a busy-wait that can consume significant CPU. Consider adding a short sleep (e.g., std::this_thread::sleep_for(std::chrono::microseconds(1))) inside the loop to yield the CPU, which is a common pattern for such polling loops.

        while (true) {
            meta_.engine->getTransferStatus(batchID, 0, status);
            if (status.s == TransferStatusEnum::COMPLETED) {
                break;
            }
            TORCH_CHECK(status.s != TransferStatusEnum::FAILED,
                        "P2P ctrl tail update failed.");
            std::this_thread::sleep_for(std::chrono::microseconds(1));
        }

@codecov-commenter
Copy link

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

@UNIDY2002 UNIDY2002 self-assigned this Feb 5, 2026
@UNIDY2002
Copy link
Collaborator

Fantastic work! I'll try my best to start reviewing shortly.

@ykwd
Copy link
Collaborator

ykwd commented Feb 5, 2026

Thanks for the great work! Communication performance is critical for elastic EP, and I believe this work is an important step toward making Mooncake EP truly practical and usable.

Copy link
Collaborator

@staryxchen staryxchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you provide some additional documentation explaining the design of the circular buffer? It appears to be quite complex to implement.

Comment on lines +139 to +140
uint32_t remaining;
std::shared_ptr<std::atomic<bool>> completed;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should remaining also be atomic as completed?

{
P2PSendTask task{
.peerRank = dstRank,
.peerSeq = p2pSendTaskSeq_[dstRank]++,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peerSeq is unused

PAUSE();
}
meta_.engine->freeBatchID(headBatchID);
++p2pSendTaskNext_[task.peerRank];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p2pSendTaskNext_ seems to be unused.

@yuechen-sys yuechen-sys closed this Feb 7, 2026
@yuechen-sys
Copy link
Collaborator Author

Could you provide some additional documentation explaining the design of the circular buffer? It appears to be quite complex to implement.

Thank you for your review! The current implementation has architectural issues. I will submit a new PR shortly to address the concerns you raised.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants