[Store][Feature] Add CXL storage for mooncake_store#1365
[Store][Feature] Add CXL storage for mooncake_store#1365stmatengss merged 5 commits intokvcache-ai:mainfrom
Conversation
Summary of ChangesHello @qiuweit7, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces comprehensive support for CXL shared memory within the Mooncake Store. It enables the master service to efficiently manage and allocate CXL segments, leveraging CacheLib for memory management. The client-side interfaces are updated to transparently utilize CXL devices, ensuring that existing data manipulation APIs function correctly with this new high-speed memory. The changes also include new configuration options for CXL and a dedicated test suite to verify its integration and functionality. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces CXL storage support for mooncake_store, which is a significant new feature. The changes span across configuration, allocation strategies, client and master services, and the transfer engine to accommodate the CXL memory protocol. While the overall approach is sound, I've identified several critical syntax errors that will prevent compilation, as well as some logical issues in the new CXL allocation strategy. Additionally, a potential regression in hugepage support for non-CXL protocols was noted. The newly added integration tests for CXL are a great addition, but they should be made self-contained to improve portability and reliability.
| class CxlAllocationStrategy : public AllocationStrategy { | ||
| public: | ||
| CxlAllocationStrategy() = default; | ||
| tl::expected<std::vector<Replica>, ErrorCode> Allocate( | ||
| const AllocatorManager& allocator_manager, const size_t slice_length, | ||
| const size_t replica_num = 1, | ||
| const std::vector<std::string>& preferred_segments = | ||
| std::vector<std::string>(), | ||
| const std::set<std::string>& excluded_segments = | ||
| std::set<std::string>()) { | ||
| if (slice_length == 0 || replica_num == 0) { | ||
| return tl::make_unexpected(ErrorCode::INVALID_PARAMS); | ||
| } | ||
|
|
||
| if (preferred_segments.empty()) { | ||
| LOG(ERROR) << "Preferred_segments is empty."; | ||
| return tl::make_unexpected(ErrorCode::INVALID_PARAMS); | ||
| } | ||
|
|
||
| const std::string& cxl_segment_name = preferred_segments[0]; | ||
| // if (excluded_segments.contains(cxl_segment_name)) { | ||
| // return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE); | ||
| // } | ||
| VLOG(1) << "Do cxl allocate, overwriten segment=" << cxl_segment_name; | ||
|
|
||
| const auto cxl_allocators = allocator_manager.getAllocators(cxl_segment_name); | ||
|
|
||
| if (cxl_allocators == nullptr || cxl_allocators->size() == 0) { | ||
| return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE); | ||
| } | ||
| std::shared_ptr<BufferAllocatorBase> cxl_allocator = (*cxl_allocators)[0]; | ||
| if (!cxl_allocator) { | ||
| LOG(ERROR) << "No CXL allocator in preferred_segment"; | ||
| return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE); | ||
| } | ||
|
|
||
| std::vector<Replica> replicas; | ||
| replicas.reserve(replica_num); | ||
|
|
||
| auto buffer = cxl_allocator->allocate(slice_length); | ||
| if(buffer){ | ||
| buffer->change_to_cxl(cxl_segment_name); | ||
| } | ||
| replicas.emplace_back(std::move(buffer), | ||
| ReplicaStatus::PROCESSING); | ||
|
|
||
| if (replicas.empty()) { | ||
| return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE); | ||
| } | ||
| VLOG(1) << "Successfully allocated " << replicas.size() << " CXL replica."; | ||
| return replicas; | ||
| } | ||
| }; |
There was a problem hiding this comment.
| MasterServiceConfigBuilder& set_processing_task_timeout_sec(uint64_t sec) { | ||
| processing_task_timeout_sec_ = sec; | ||
| MasterServiceConfigBuilder& set_cxl_path(const std::string& path) { |
There was a problem hiding this comment.
There's a syntax error here. The method set_processing_task_timeout_sec is missing its closing brace } and a return *this; statement. This will cause a compilation error.
MasterServiceConfigBuilder& set_processing_task_timeout_sec(uint64_t sec) {
processing_task_timeout_sec_ = sec;
return *this;
}
MasterServiceConfigBuilder& set_cxl_path(const std::string& path) {| task_manager_(config.task_manager_config) { | ||
| cxl_path_(config.cxl_path), | ||
| cxl_size_(config.cxl_size), | ||
| enable_cxl_(config.enable_cxl) { |
There was a problem hiding this comment.
There is a syntax error in the constructor's member initializer list. The CXL-related members are being initialized inside the constructor body instead of the initializer list, which is incorrect syntax.
| task_manager_(config.task_manager_config) { | |
| cxl_path_(config.cxl_path), | |
| cxl_size_(config.cxl_size), | |
| enable_cxl_(config.enable_cxl) { | |
| task_manager_(config.task_manager_config), | |
| cxl_path_(config.cxl_path), | |
| cxl_size_(config.cxl_size), | |
| enable_cxl_(config.enable_cxl) { |
| auto buffer = cxl_allocator->allocate(slice_length); | ||
| if(buffer){ | ||
| buffer->change_to_cxl(cxl_segment_name); | ||
| } | ||
| replicas.emplace_back(std::move(buffer), | ||
| ReplicaStatus::PROCESSING); | ||
|
|
||
| if (replicas.empty()) { | ||
| return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE); | ||
| } | ||
| VLOG(1) << "Successfully allocated " << replicas.size() << " CXL replica."; | ||
| return replicas; |
There was a problem hiding this comment.
The logic for handling allocation failure and replica_num seems incorrect:
- The
replica_numparameter is not used to create multiple replicas. The code always creates one replica. Ifreplica_numis intended to be always 1 for CXL, there should be an assertion or check. - If
cxl_allocator->allocate(slice_length)returns a null pointer, aReplicais still created with a null buffer and pushed intoreplicas. The function then returns a vector containing an invalid replica, which is likely a bug. - The check
if (replicas.empty())is redundant as aReplicais always pushed into the vector.
The suggested change corrects the handling of allocation failures.
| auto buffer = cxl_allocator->allocate(slice_length); | |
| if(buffer){ | |
| buffer->change_to_cxl(cxl_segment_name); | |
| } | |
| replicas.emplace_back(std::move(buffer), | |
| ReplicaStatus::PROCESSING); | |
| if (replicas.empty()) { | |
| return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE); | |
| } | |
| VLOG(1) << "Successfully allocated " << replicas.size() << " CXL replica."; | |
| return replicas; | |
| auto buffer = cxl_allocator->allocate(slice_length); | |
| if (!buffer) { | |
| return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE); | |
| } | |
| buffer->change_to_cxl(cxl_segment_name); | |
| replicas.emplace_back(std::move(buffer), | |
| ReplicaStatus::PROCESSING); | |
| VLOG(1) << "Successfully allocated " << replicas.size() << " CXL replica."; | |
| return replicas; |
| } else { | ||
| auto max_mr_size = globalConfig().max_mr_size; // Max segment size | ||
| uint64_t total_glbseg_size = global_segment_size; // For logging | ||
| uint64_t current_glbseg_size = 0; // For logging | ||
| while (global_segment_size > 0) { | ||
| size_t segment_size = std::min(global_segment_size, max_mr_size); | ||
| global_segment_size -= segment_size; | ||
| current_glbseg_size += segment_size; | ||
| LOG(INFO) << "Mounting segment: " << segment_size << " bytes, " | ||
| << current_glbseg_size << " of " << total_glbseg_size; | ||
| void *ptr =allocate_buffer_allocator_memory(segment_size, this->protocol); | ||
| if (!ptr) { | ||
| LOG(ERROR) << "Failed to allocate segment memory"; | ||
| return tl::unexpected(ErrorCode::INVALID_PARAMS); | ||
| } | ||
| if (this->protocol == "ascend") { | ||
| ascend_segment_ptrs_.emplace_back(ptr); | ||
| } else { | ||
| segment_ptrs_.emplace_back(ptr); | ||
| } | ||
| auto mount_result = client_->MountSegment(ptr, segment_size, protocol); | ||
| if (!mount_result.has_value()) { | ||
| LOG(ERROR) << "Failed to mount segment: " | ||
| << toString(mount_result.error()); | ||
| return tl::unexpected(mount_result.error()); | ||
| } | ||
| } | ||
| if (total_glbseg_size == 0) { | ||
| LOG(INFO) << "Global segment size is 0, skip mounting segment"; | ||
| } | ||
| } |
There was a problem hiding this comment.
The refactoring of setup_internal seems to have removed support for huge pages for non-CXL protocols. The original code had logic to handle should_use_hugepage, but it's missing in the new version. This might be an unintentional regression. If huge page support is still desired for non-CXL protocols, the logic should be restored.
| DEFINE_string(cxl_device_name, "/dev/dax0.0", "Device name for cxl"); | ||
| DEFINE_uint64(cxl_device_size, 8589934592, "Device Size for cxl"); | ||
| DEFINE_bool(auto_disc, false, "Auto discover tcp devices"); | ||
| DEFINE_string(transfer_engine_metadata_url, "etcd://10.130.5.132:2379", "Metadata connection string for transfer engine"); |
There was a problem hiding this comment.
The new test cxl_client_integration_test.cpp contains hardcoded IP addresses for services like etcd. This makes the test dependent on an external environment and not portable. It would be better to make the test self-contained, for example by using InProcMaster as done in client_integration_test.cpp to run a master service within the test process.
|
Good feature! |
9f4fa4b to
6366b88
Compare
| void setAutoDiscover(bool auto_discover); | ||
|
|
||
| void* getBaseAddr(); | ||
| // std::shared_ptr<TransferMetadata> getMetadata() { return metadata_; } |
There was a problem hiding this comment.
Removing this comment is better.
6366b88 to
fedec08
Compare
|
Thanks for this work. Could you share more details about the system design? At the moment, most reviewers do not have access to CXL memory, so they are unable to run tests. When reviewing the code alone, there are many points that are confusing. For example, is there only a single global segment, and why does the client need to mount a segment? |
There was a problem hiding this comment.
Pull request overview
This PR adds CXL (Compute Express Link) storage support to Mooncake Store, enabling shared memory-backed storage with centralized segment management. The Master service manages CXL allocation using CacheLib, while clients can access CXL devices through Put/Get and BatchPut/BatchGet APIs.
Changes:
- Added CXL transport integration in the transfer engine layer with base address retrieval
- Implemented CXL-specific allocation strategy and segment management on the Master
- Extended Client API to support CXL protocol with segment mounting and data operations
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| mooncake-transfer-engine/src/transfer_engine.cpp | Added getBaseAddr() method for CXL base address retrieval |
| mooncake-transfer-engine/src/multi_transport.cpp | Implemented CXL transport base address extraction |
| mooncake-transfer-engine/include/transfer_engine_impl.h | Exposed getBaseAddr() through implementation layer |
| mooncake-transfer-engine/include/transfer_engine.h | Added public getBaseAddr() API |
| mooncake-transfer-engine/include/multi_transport.h | Declared getBaseAddr() method |
| mooncake-store/tests/cxl_client_integration_test.cpp | Added integration tests for CXL client operations |
| mooncake-store/tests/client_integration_test.cpp | Updated MountSegment calls to include protocol parameter |
| mooncake-store/tests/CMakeLists.txt | Registered new CXL integration test |
| mooncake-store/src/segment.cpp | Implemented CXL segment mounting and global allocator initialization |
| mooncake-store/src/real_client.cpp | Added CXL-specific segment setup logic |
| mooncake-store/src/master_service.cpp | Integrated CXL allocator and strategy initialization |
| mooncake-store/src/master.cpp | Added CXL configuration flags and initialization |
| mooncake-store/src/client_service.cpp | Added CXL transport installation and base address retrieval |
| mooncake-store/src/allocator.cpp | Implemented CXL offset handling and deallocation logic |
| mooncake-store/include/types.h | Added CXL constants and protocol field to Segment |
| mooncake-store/include/segment.h | Added CXL allocator support to SegmentManager |
| mooncake-store/include/master_service.h | Added CXL configuration fields |
| mooncake-store/include/master_config.h | Extended configuration structures with CXL parameters |
| mooncake-store/include/client_service.h | Updated MountSegment signature and added GetBaseAddr() |
| mooncake-store/include/allocator.h | Added CXL conversion methods and protocol field to AllocatedBuffer |
| mooncake-store/include/allocation_strategy.h | Implemented CxlAllocationStrategy |
| extern/pybind11 | Updated pybind11 submodule commit |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| void setAutoDiscover(bool auto_discover); | ||
|
|
||
| void* getBaseAddr(); | ||
| // std::shared_ptr<TransferMetadata> getMetadata() { return metadata_; } |
There was a problem hiding this comment.
Commented-out code should be removed rather than left in the codebase. If this functionality is planned for future use, track it in an issue instead.
| // std::shared_ptr<TransferMetadata> getMetadata() { return metadata_; } |
| // std::this_thread::sleep_for(std::chrono::seconds(30)); | ||
|
|
There was a problem hiding this comment.
Remove commented-out debug code before merging. If this sleep is needed for debugging, consider using a configurable flag instead.
| // std::this_thread::sleep_for(std::chrono::seconds(30)); |
| FLAGS_transfer_engine_metadata_url, // Metadata connection string | ||
| FLAGS_protocol, // Transfer protocol | ||
| std::nullopt, // RDMA device names (auto-discovery) | ||
| // master_address_ // Master server address (non-HA) |
There was a problem hiding this comment.
Remove commented-out code blocks. If these represent alternative configuration approaches, document them separately or remove entirely.
| // master_address_ // Master server address (non-HA) |
| // Start an in-process non-HA master without HTTP metadata server | ||
| // ASSERT_TRUE(master_.Start(InProcMasterConfigBuilder().build())); | ||
| // master_address_ = master_.master_address(); | ||
| // metadata_url_ = master_.metadata_url(); | ||
| // LOG(INFO) << "Started in-proc master at " << master_address_ | ||
| // << ", metadata=P2PHANDSHAKE"; |
There was a problem hiding this comment.
Remove commented-out code blocks. If these represent alternative configuration approaches, document them separately or remove entirely.
| // Start an in-process non-HA master without HTTP metadata server | |
| // ASSERT_TRUE(master_.Start(InProcMasterConfigBuilder().build())); | |
| // master_address_ = master_.master_address(); | |
| // metadata_url_ = master_.metadata_url(); | |
| // LOG(INFO) << "Started in-proc master at " << master_address_ | |
| // << ", metadata=P2PHANDSHAKE"; | |
| // Tests expect an already running master/metadata service configured via flags. |
mooncake-store/include/segment.h
Outdated
| #include "allocation_strategy.h" | ||
| #include "allocator.h" | ||
| #include "types.h" | ||
| // #include "master_config.h" |
There was a problem hiding this comment.
Remove commented-out include directive. If the include is not needed, it should be deleted completely.
| // #include "master_config.h" |
| // if (excluded_segments.contains(cxl_segment_name)) { | ||
| // return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE); | ||
| // } | ||
| VLOG(1) << "Do cxl allocate, overwritten segment=" << cxl_segment_name; | ||
|
|
There was a problem hiding this comment.
Remove commented-out code. If excluding segments functionality is planned for future implementation, add a TODO comment or track it in an issue instead.
| // if (excluded_segments.contains(cxl_segment_name)) { | |
| // return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE); | |
| // } | |
| VLOG(1) << "Do cxl allocate, overwritten segment=" << cxl_segment_name; | |
| // TODO: If excluded_segments support is implemented for CXL allocation, | |
| // ensure cxl_segment_name is not excluded before proceeding. | |
| VLOG(1) << "Do cxl allocate, overwritten segment=" << cxl_segment_name; |
mooncake-store/src/allocator.cpp
Outdated
| } | ||
|
|
||
| void AllocatedBuffer::change_to_cxl(std::string client_segment_name) { | ||
| u_int64_t offset_raw = reinterpret_cast<uintptr_t>(buffer_ptr_); |
There was a problem hiding this comment.
Use standard type 'uint64_t' instead of non-standard 'u_int64_t' for consistency with the rest of the codebase.
mooncake-store/src/allocator.cpp
Outdated
| } | ||
|
|
||
| void* AllocatedBuffer::get_vaddr_from_cxl() { | ||
| u_int64_t offset_raw = reinterpret_cast<uintptr_t>(buffer_ptr_); |
There was a problem hiding this comment.
Use standard type 'uint64_t' instead of non-standard 'u_int64_t' for consistency with the rest of the codebase.
Thank you for your valuable feedback and questions! Here is a detailed introduction to the system design to address the points you raised: |
| LOG(FATAL) << "MC_CXL_DEV_SIZE not set"; | ||
| return tl::unexpected(ErrorCode::INVALID_PARAMS); | ||
| } | ||
| if (this->protocol == "ascend") { | ||
| ascend_segment_ptrs_.emplace_back(ptr); | ||
| } else if (should_use_hugepage) { | ||
| hugepage_segment_ptrs_.emplace_back( | ||
| ptr, HugepageSegmentDeleter{mapped_size}); | ||
| } else { | ||
| segment_ptrs_.emplace_back(ptr); | ||
| } | ||
| auto mount_result = client_->MountSegment(ptr, mapped_size); | ||
|
|
||
| void *ptr = client_->GetBaseAddr(); | ||
| LOG(INFO) << "Mounting CXL segment: " << cxl_dev_size << " bytes, " | ||
| << ptr; | ||
| auto mount_result = client_->MountSegment(ptr, cxl_dev_size, protocol); | ||
| if (!mount_result.has_value()) { | ||
| LOG(ERROR) << "Failed to mount segment: " | ||
| << toString(mount_result.error()); | ||
| return tl::unexpected(mount_result.error()); | ||
| } | ||
| } | ||
| if (total_glbseg_size == 0) { | ||
| LOG(INFO) << "Global segment size is 0, skip mounting segment"; | ||
|
|
||
| } else { | ||
| auto max_mr_size = globalConfig().max_mr_size; // Max segment size | ||
| uint64_t total_glbseg_size = global_segment_size; // For logging | ||
| uint64_t current_glbseg_size = 0; // For logging | ||
| while (global_segment_size > 0) { | ||
| size_t segment_size = std::min(global_segment_size, max_mr_size); | ||
| global_segment_size -= segment_size; | ||
| current_glbseg_size += segment_size; | ||
| LOG(INFO) << "Mounting segment: " << segment_size << " bytes, " | ||
| << current_glbseg_size << " of " << total_glbseg_size; | ||
|
|
||
| size_t mapped_size = segment_size; | ||
| void *ptr = nullptr; | ||
| if (should_use_hugepage) { | ||
| mapped_size = | ||
| align_up(segment_size, get_hugepage_size_from_env()); | ||
| ptr = allocate_buffer_mmap_memory(mapped_size, |
fedec08 to
b2a67bc
Compare
b2a67bc to
8e49e3f
Compare
d705b26 to
6926deb
Compare
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
a793cd3 to
63bd273
Compare
| option(USE_UBSHMEM "option for using ascend npu with shmem" OFF) | ||
| option(USE_ASCEND_HETEROGENEOUS "option for transferring between ascend npu and gpu" OFF) | ||
| option(USE_MNNVL "option for using Multi-Node NVLink transport" OFF) | ||
| option(USE_CXL "option for using CXL protocol" OFF) |
There was a problem hiding this comment.
We don't need to enable it for all scenarios. Just test it in CI.
c51d096 to
260605c
Compare
|
|
||
| // From WrappedMasterServiceConfig | ||
| MasterServiceConfig(const WrappedMasterServiceConfig& config) { | ||
| auto cxl_allocator_type = BufferAllocatorType::CACHELIB; |
There was a problem hiding this comment.
For CXL, we must use CACHELIB?
There was a problem hiding this comment.
No, using CACHELIB for CXL is just a temporary design choice.
mooncake-store/src/segment.cpp
Outdated
| VLOG(1) << "Init CXL global allocator."; | ||
| VLOG(1) << "[CXL] create allocator with " | ||
| << "path=" << cxl_path << " base=0x" << std::hex << DEFAULT_CXL_BASE | ||
| << std::dec << " size=" << cxl_size << " (" << std::fixed | ||
| << std::setprecision(2) << cxl_size / (1024.0 * 1024 * 1024) | ||
| << " GB)"; |
There was a problem hiding this comment.
Could you use LOG(INFO) instead?
e7b2e37 to
ba6a497
Compare
ba6a497 to
eb65e35
Compare
* [Store] feat: add cxl storage for mooncake store * Update extern/pybind11 to match main * fix: use fake cxl device to bypass ci-test error * Fix code formatting in segment.cpp --------- Co-authored-by: Teng Ma <sima.mt@alibaba-inc.com>

Description
This is the first PR for the CXL feature.
It introduces Mooncake storage backed by CXL shared memory: the Master centrally manages and allocates CXL segments, currently using CacheLib as the default allocator. Clients can read and write the CXL device through the Put/Get APIs, and BatchPut/BatchGet interfaces are also provided.
Type of Change
How Has This Been Tested?
Checklist