[PG] Implement circular buffer for efficient send/recv#1503
[PG] Implement circular buffer for efficient send/recv#1503yuechen-sys wants to merge 4 commits intokvcache-ai:mainfrom
Conversation
- Support large tensors via ring buffer - Reduce extra sync to improve P2P perf - Temporally make P2P buffers independent
- Introduce control worker to submit transfers and update remote head - Make send worker only copy data into slots - Format code
Summary of ChangesHello @yuechen-sys, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request delivers a substantial upgrade to the Mooncake backend's point-to-point communication capabilities, specifically targeting performance bottlenecks in large tensor transfers. By implementing a sophisticated circular buffer architecture, the system can now handle data in chunks, bypassing previous synchronization overheads. A key architectural change is the decoupling of data transfer from control logic, assigning a dedicated worker to manage RDMA operations and remote state updates. This refactoring directly addresses performance issues and significantly improves the efficiency of distributed operations within the Mooncake framework. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a significant architectural change to the P2P communication in the Mooncake backend, replacing the previous mechanism with a more efficient circular buffer implementation. This is a great improvement that yields substantial performance gains as shown in the benchmarks. The decoupling of the data and control planes is well-executed. My review focuses on improving the robustness and maintainability of the new implementation. I've identified some areas for improvement in error handling, where failures could lead to crashes instead of being gracefully propagated. I've also pointed out a few busy-wait loops that could be optimized to reduce CPU consumption, and some code duplication that could be refactored for better maintainability.
| } else if (status.s == TransferStatusEnum::FAILED) { | ||
| meta_.engine->freeBatchID(task.batchID); | ||
| TORCH_CHECK(false, "P2P send transfer failed."); | ||
| } else { |
There was a problem hiding this comment.
A failed RDMA transfer causes a TORCH_CHECK failure, which will terminate the process. This is inconsistent with the error handling in other parts of the code (like p2PSendWorkerThread) which propagate errors to the Work object. Consider propagating the error via the P2PSendOpTracker instead of crashing. This would make the error handling more robust and consistent.
Example of how to propagate the error:
} else if (status.s == TransferStatusEnum::FAILED) {
meta_.engine->freeBatchID(task.batchID);
bool expected_err = false;
if (task.tracker->errorSet.compare_exchange_strong(expected_err, true, std::memory_order_acq_rel)) {
*task.tracker->errorMsg = "P2P send transfer failed.";
task.tracker->completed->store(true, std::memory_order_release);
}
outstanding_tasks[i] = std::move(outstanding_tasks.back());
outstanding_tasks.pop_back();
didWork = true;
} else {| void MooncakeBackend::processSendOp(const P2POp& op) { | ||
| auto tensor = op.tensor; | ||
| int dstRank = op.peerRank; | ||
|
|
||
| uint64_t sendAddr = sendAddrBase + baseSlot * kP2PSlotSize; | ||
| void* sendBuf = reinterpret_cast<void*>(sendAddr); | ||
| const uint64_t numBytes = | ||
| tensor.numel() * static_cast<uint64_t>(tensor.element_size()); | ||
|
|
||
| if (isCpu_) { | ||
| std::memcpy(sendBuf, tensor.data_ptr(), numBytes); | ||
| } else { | ||
| auto stream = at::cuda::getCurrentCUDAStream(tensor.device().index()); | ||
| auto err = cudaMemcpyAsync(sendBuf, tensor.data_ptr(), numBytes, | ||
| cudaMemcpyDeviceToDevice, stream); | ||
| TORCH_CHECK( | ||
| !err, "P2P send cudaMemcpyAsync failed: ", cudaGetErrorString(err)); | ||
| cudaStreamSynchronize(stream); | ||
| if (numBytes == 0) { | ||
| op.completed->store(true, std::memory_order_release); | ||
| return; | ||
| } | ||
| uint64_t remoteRecvAddrBase = meta_.segmentInfos[dstRank].recv_buffer[0]; | ||
|
|
||
| uint64_t remoteRecvAddr = remoteRecvAddrBase + baseSlot * kP2PSlotSize; | ||
| std::vector<TransferRequest> entries; | ||
| entries.push_back(TransferRequest{ | ||
| .opcode = TransferRequest::WRITE, | ||
| .source = sendBuf, | ||
| .target_id = meta_.segmentIDs[dstRank], | ||
| .target_offset = remoteRecvAddr, | ||
| .length = numBytes, | ||
| }); | ||
| auto batchID = meta_.engine->allocateBatchID(entries.size()); | ||
| meta_.engine->submitTransfer(batchID, entries); | ||
|
|
||
| TransferStatus status; | ||
| while (true) { | ||
| meta_.engine->getTransferStatus(batchID, 0, status); | ||
| if (status.s == TransferStatusEnum::COMPLETED) { | ||
| break; | ||
|
|
||
| const uint32_t capacity = static_cast<uint32_t>(kP2PNumSlotsPerRank); | ||
| const uint32_t numSlotsNeeded = | ||
| static_cast<uint32_t>((numBytes + kP2PSlotSize - 1) / kP2PSlotSize); | ||
|
|
||
| uint32_t curSlotOffset = 0; | ||
| uint64_t bytesSent = 0; | ||
|
|
||
| auto* local_ctrl = &p2p_ctrl_send_region_[dstRank]; | ||
| auto& remote_tail = local_ctrl->tail; | ||
| uint32_t head = p2pSendLocalHead_[dstRank]; | ||
|
|
||
| const uint64_t sendAddrBase = | ||
| meta_.segmentInfos[rank_].p2p_send_buffer + | ||
| static_cast<uint64_t>(dstRank) * kP2PBytesPerRank; | ||
| const uint64_t remoteRecvAddrBase = | ||
| meta_.segmentInfos[dstRank].p2p_recv_buffer + | ||
| static_cast<uint64_t>(rank_) * kP2PBytesPerRank; | ||
|
|
||
| const auto* tensor_ptr = static_cast<const uint8_t*>(tensor.data_ptr()); | ||
|
|
||
| auto tracker = std::make_shared<P2PSendOpTracker>( | ||
| static_cast<uint32_t>(numSlotsNeeded), op.completed, op.errorMsg); | ||
|
|
||
| while (curSlotOffset < numSlotsNeeded) { | ||
| // Send Phase1: Wait for available slot | ||
| while (((head + 1) % capacity) == remote_tail) { | ||
| PAUSE(); | ||
| } | ||
|
|
||
| // Send Phase2: Data Copy | ||
| const size_t chunkBytes = | ||
| std::min(static_cast<size_t>(numBytes - bytesSent), | ||
| static_cast<size_t>(kP2PSlotSize)); | ||
|
|
||
| const uint64_t sendAddr = | ||
| sendAddrBase + static_cast<uint64_t>(head) * kP2PSlotSize; | ||
| void* sendBuf = reinterpret_cast<void*>(sendAddr); | ||
|
|
||
| if (isCpu_) { | ||
| std::memcpy(sendBuf, tensor_ptr + bytesSent, chunkBytes); | ||
| } else { | ||
| auto stream = | ||
| at::cuda::getCurrentCUDAStream(tensor.device().index()); | ||
| auto err = | ||
| cudaMemcpyAsync(sendBuf, tensor_ptr + bytesSent, chunkBytes, | ||
| cudaMemcpyDeviceToDevice, stream); | ||
| TORCH_CHECK(!err, "P2P send cudaMemcpyAsync failed: ", | ||
| cudaGetErrorString(err)); | ||
| cudaStreamSynchronize(stream); | ||
| } | ||
| TORCH_CHECK(status.s != TransferStatusEnum::FAILED, | ||
| "P2P send transfer failed."); | ||
| } | ||
|
|
||
| meta_.store->set(ctrlKey, | ||
| c10::str(baseSlot, "_", allocatedSlots, "_", numBytes)); | ||
| // Send Phase3: enqueue RDMA write task for ctrl worker | ||
| { | ||
| P2PSendTask task{ | ||
| .peerRank = dstRank, | ||
| .peerSeq = p2pSendTaskSeq_[dstRank]++, | ||
| .source = sendBuf, | ||
| .target_id = meta_.segmentIDs[dstRank], | ||
| .target_offset = remoteRecvAddrBase + | ||
| static_cast<uint64_t>(head) * kP2PSlotSize, | ||
| .length = chunkBytes, | ||
| .tracker = tracker, | ||
| }; | ||
| std::lock_guard<std::mutex> lock(p2pSendTaskQueueMutex_); | ||
| p2pSendTaskQueue_.push_back(std::move(task)); | ||
| } | ||
| p2pSendTaskQueueCv_.notify_one(); | ||
|
|
||
| // Send Phase4: Advance local head (ctrl head updates on completion) | ||
| head = (head + 1) % capacity; | ||
| p2pSendLocalHead_[dstRank] = head; | ||
| bytesSent += chunkBytes; | ||
| curSlotOffset += 1; | ||
| } | ||
| } |
There was a problem hiding this comment.
If an exception occurs within processSendOp (e.g., from cudaMemcpyAsync), it's caught in p2PSendWorkerThread, which marks the operation as complete. However, the P2PSendOpTracker is not notified of the error because the errorSet flag is not set. The p2PCtrlWorkerThread might still be processing tasks for this operation and could incorrectly try to mark it as complete again later. To fix this potential race condition, wrap the main loop of processSendOp in a try-catch block to set the errorSet flag on the tracker before re-throwing the exception.
mooncake-pg/src/mooncake_backend.cpp
Outdated
| if (p2p_send_buffer_) { | ||
| engine_.unregisterLocalMemory(p2p_send_buffer_); | ||
| if (isCpu_) { | ||
| free(p2p_send_buffer_); | ||
| } else { | ||
| cudaFree(p2p_send_buffer_); | ||
| } | ||
| p2p_send_buffer_ = nullptr; | ||
| } | ||
|
|
||
| if (p2p_recv_buffer_) { | ||
| engine_.unregisterLocalMemory(p2p_recv_buffer_); | ||
| if (isCpu_) { | ||
| free(p2p_recv_buffer_); | ||
| } else { | ||
| cudaFree(p2p_recv_buffer_); | ||
| } | ||
| p2p_recv_buffer_ = nullptr; | ||
| } |
There was a problem hiding this comment.
The resource cleanup logic for p2p_send_buffer_ and p2p_recv_buffer_ is duplicated. This pattern is also used for p2p_ctrl_send_region_, p2p_ctrl_recv_region_, and other buffers in this function. Consider creating helper functions to encapsulate this logic, which would improve readability and maintainability by reducing code repetition.
| while (true) { | ||
| meta_.engine->getTransferStatus(headBatchID, 0, headStatus); | ||
| if (headStatus.s == TransferStatusEnum::COMPLETED) { | ||
| break; | ||
| } | ||
| } catch (const std::exception&) { | ||
| // Invalid format, keep waiting. | ||
| TORCH_CHECK(headStatus.s != TransferStatusEnum::FAILED, | ||
| "P2P ctrl head update failed."); | ||
| } |
There was a problem hiding this comment.
This while(true) loop for polling transfer status is a busy-wait that can consume significant CPU while waiting for the RDMA operation to complete. To avoid this, consider adding a short sleep (e.g., std::this_thread::sleep_for(std::chrono::microseconds(1))) inside the loop to yield the CPU. This pattern is used in other parts of the codebase.
while (true) {
meta_.engine->getTransferStatus(headBatchID, 0, headStatus);
if (headStatus.s == TransferStatusEnum::COMPLETED) {
break;
}
TORCH_CHECK(headStatus.s != TransferStatusEnum::FAILED,
"P2P ctrl head update failed.");
std::this_thread::sleep_for(std::chrono::microseconds(1));
}| while (true) { | ||
| meta_.engine->getTransferStatus(batchID, 0, status); | ||
| if (status.s == TransferStatusEnum::COMPLETED) { | ||
| break; | ||
| } | ||
| TORCH_CHECK(status.s != TransferStatusEnum::FAILED, | ||
| "P2P ctrl tail update failed."); | ||
| } |
There was a problem hiding this comment.
This while(true) loop for polling transfer status is a busy-wait that can consume significant CPU. Consider adding a short sleep (e.g., std::this_thread::sleep_for(std::chrono::microseconds(1))) inside the loop to yield the CPU, which is a common pattern for such polling loops.
while (true) {
meta_.engine->getTransferStatus(batchID, 0, status);
if (status.s == TransferStatusEnum::COMPLETED) {
break;
}
TORCH_CHECK(status.s != TransferStatusEnum::FAILED,
"P2P ctrl tail update failed.");
std::this_thread::sleep_for(std::chrono::microseconds(1));
}|
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
|
Fantastic work! I'll try my best to start reviewing shortly. |
|
Thanks for the great work! Communication performance is critical for elastic EP, and I believe this work is an important step toward making Mooncake EP truly practical and usable. |
staryxchen
left a comment
There was a problem hiding this comment.
Could you provide some additional documentation explaining the design of the circular buffer? It appears to be quite complex to implement.
| uint32_t remaining; | ||
| std::shared_ptr<std::atomic<bool>> completed; |
There was a problem hiding this comment.
Should remaining also be atomic as completed?
| { | ||
| P2PSendTask task{ | ||
| .peerRank = dstRank, | ||
| .peerSeq = p2pSendTaskSeq_[dstRank]++, |
| PAUSE(); | ||
| } | ||
| meta_.engine->freeBatchID(headBatchID); | ||
| ++p2pSendTaskNext_[task.peerRank]; |
There was a problem hiding this comment.
p2pSendTaskNext_ seems to be unused.
Thank you for your review! The current implementation has architectural issues. I will submit a new PR shortly to address the concerns you raised. |
Description
This PR introduces a series of improvements to send/recv communication in the Mooncake backend. These changes address the performance issues reported in Issue #1421.
Changes
Future Plans
batch_isend_irecvis currently a work in progress.Type of Change
How Has This Been Tested?
Send/receive performance is measured using a thorough in-house benchmark similar to nccl-test. The performance of small message is imprecise, since the lack of barrier in mooncake backend.
Checklist
./scripts/code_format.shbefore submitting.