Skip to content

Comments

[Store] Implement Metadata Persistence and Recovery for Master Service#1431

Open
yangdao479 wants to merge 60 commits intomainfrom
dev/master-kv-ha
Open

[Store] Implement Metadata Persistence and Recovery for Master Service#1431
yangdao479 wants to merge 60 commits intomainfrom
dev/master-kv-ha

Conversation

@yangdao479
Copy link
Collaborator

@yangdao479 yangdao479 commented Jan 23, 2026

Description

This PR implements a snapshot and restore mechanism for Master Service
Related RFC: #1150

Implementation

Snapshot Generation:

  • Uses Fork with Copy-on-Write (COW) for efficient memory snapshot, minimizing blocking time on business operations
  • Periodic background snapshot thread with configurable interval
  • Serializes Master Service metadata with zstd compression
  • Uses pipe-based logging to avoid potential deadlocks caused by glog sharing between snapshot parent and child processes

Snapshot Recovery:

  • Deserializes snapshot files to restore complete Master state
  • Cleans up incomplete metadata (replicas not in COMPLETE status, expired leases)
  • Unmounts segments that are not in ready status

Storage Backends:

  • Local file system (default)
  • S3 storage (optional, requires AWS SDK)
  • etcd storage (implementation planned for a subsequent PR, will be prioritized as the default backend in future)

Known Limitations

Design Limitations:

  • Since snapshots are taken periodically, data modifications between two snapshots will be lost

Implementation Limitations:

  • etcd snapshot storage backend will be provided in a follow-up PR

Acknowledgements

Special thanks to iFLYTEK for designing and implementing this feature.

Type of Change

  • Types
    • Bug fix
    • New feature
      • Transfer Engine
      • Mooncake Store
      • Mooncake EP
      • Integration
      • P2P Store
      • Python Wheel
    • Breaking change
    • CI/CD
    • Documentation update
    • Other

How Has This Been Tested?

Run the following test binaries:

  • ./build/mooncake-store/tests/master_service_test_for_snapshot
  • ./build/mooncake-store/tests/master_service_ssd_test_for_snapshot

These tests reuse existing master_service_test and master_service_ssd_test test cases. After each test case completes, snapshot/restore is performed to create a new Master, then the following validations are executed:

  • Binary consistency: Compare snapshot files from the restored Master with the original
  • API consistency: Verify metadata state via APIs (e.g., GetAllKeys, GetAllSegments, GetReplicaList)
  • Allocator consistency: Call PutStart on both Masters to verify consistent allocation behavior

Note:

  • During testing, an environment variable is used to disable dirty data cleanup during restore to ensure accurate state comparison.
  • The S3 storage backend implementation has not been tested. The etcd backend will be prioritized in future development.

Checklist

  • I have performed a self-review of my own code.
  • I have formatted my own code using ./scripts/code_format.sh before submitting.
  • I have updated the documentation.
  • I have added tests to prove my changes are effective.

zhunzhong2 and others added 24 commits December 17, 2025 10:47
…dic persistence:

- Add snapshot configuration options, including snapshot enablement, snapshot interval, timeout duration, etc.
- Implement the snapshot thread function to periodically fork child processes for state persistence.
- Add snapshot serialization logic to support serializing metadata and segment information to S3.
- Introduce a file utility class that supports chunked writing of large files and automatic directory creation.
- Add an S3 helper class to implement multipart upload and object deletion functionalities.
- Add new snapshot-related metric monitoring, such as statistics on snapshot duration, success/failure counts.
- Add support for cleaning up old snapshots, retaining a specified number of historical snapshots.
- Modify the time retrieval method to uniformly use system_clock instead of steady_clock.
- Add a snapshot recovery flag to control and support data recovery from snapshots.
- Introduce the zstd compression library dependency and integrate it into the build system.
* [Store]: Abstract snapshot storage backend and implement local file backend

* [Store]: fix bugs in master service snapshot and restore

* [Store]: Add missing fields for snapshot/restore functionality

- Support DISK and LOCAL_DISK replica types in snapshot/restore
- Add allocator_names to preserve allocator ordering
- Add client_local_disk_segment_ snapshot/restore support
- Add client_by_name_ reconstruction on restore

* [Store]: Add snapshot/restore tests for MasterService

- Add test base class for snapshot/restore verification
- Port tests from master_service_test.cpp for snapshot/restore verification
- Port tests from master_service_ssd_test.cpp for SSD snapshot/restore verification
- Sort unordered_map keys before serialization for deterministic snapshots

* [Store]: translate comments into English
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @yangdao479, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a snapshot and restore feature to the Mooncake Store's Master Service, enabling periodic backups of metadata and recovery from these snapshots to reduce restart times. It includes support for local and S3 storage backends, along with necessary configurations and testing.

Highlights

  • Snapshot and Restore Mechanism: Implements a snapshot and restore mechanism for Master Service metadata, enhancing recovery and reducing cache warm-up time after restarts.
  • Efficient Snapshot Generation: Utilizes Fork with Copy-on-Write (COW) for memory snapshots, minimizing blocking time on business operations and using zstd compression for serialization.
  • Storage Backend Flexibility: Supports local file system (default) and S3 storage (optional, requires AWS SDK) for snapshot storage, with etcd planned for future implementation.
  • Configuration Options: Introduces new flags and configurations for enabling snapshotting, setting snapshot intervals, and choosing storage backends.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces snapshot and restore functionality to the mooncake store, allowing for periodic snapshots of master metadata and recovery from these snapshots to reduce cache warm-up time after a master restart. The changes include adding new configuration flags, defining snapshot storage backend types (local or S3), implementing serialization and deserialization logic for metadata and segments, and creating a snapshot thread to periodically persist the master state. The code review highlights the importance of the snapshot mutex for data consistency, the security checks in the local file backend, and the use of a custom binary format within MessagePack for serialization, suggesting a more MessagePack-idiomatic approach for better maintainability. The review also points out the use of a non-thread-safe function for timestamp formatting and suggests using thread-safe alternatives.

Comment on lines 1790 to 1794
std::unique_lock<std::shared_mutex> lock(SNAPSHOT_MUTEX);
LOG(INFO) << "[Snapshot] Locking snapshot mutex, snapshot_id="
<< snapshot_id;
pid = fork();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The fork() call is correctly wrapped in a std::unique_lock on SNAPSHOT_MUTEX. This is critical to prevent any modifications to the parent process's memory during the fork() operation, ensuring that the child process gets a consistent snapshot of the memory for copy-on-write.

Comment on lines +274 to +372
const std::string& prefix) {
std::string full_path = KeyToPath(prefix);

try {
// Security check: ensure path to delete is within base_path_
std::error_code ec;
fs::path canonical_base = fs::canonical(base_path_, ec);
if (ec) {
// base_path_ doesn't exist, try to create it and get canonical path
fs::create_directories(base_path_);
canonical_base = fs::canonical(base_path_, ec);
if (ec) {
return tl::make_unexpected(
fmt::format("Failed to resolve base path {}: {}",
base_path_, ec.message()));
}
}

// Check if target path exists
if (!fs::exists(full_path)) {
// Path doesn't exist, treat as successful deletion
return {};
}

fs::path canonical_target = fs::canonical(full_path, ec);
if (ec) {
return tl::make_unexpected(
fmt::format("Failed to resolve target path {}: {}", full_path,
ec.message()));
}

// Verify target path is within base_path_
std::string base_str = canonical_base.string();
std::string target_str = canonical_target.string();

// Ensure target path starts with base_path_
if (target_str.length() < base_str.length() ||
target_str.substr(0, base_str.length()) != base_str) {
LOG(ERROR) << "Security violation: Attempted to delete path "
"outside base directory. "
<< "base_path=" << base_str
<< ", target_path=" << target_str;
return tl::make_unexpected(fmt::format(
"Security error: Path {} is outside base directory {}",
full_path, base_path_));
}

// Don't allow deleting base_path_ itself
if (target_str == base_str) {
LOG(ERROR) << "Security violation: Attempted to delete base "
"directory itself. "
<< "base_path=" << base_str;
return tl::make_unexpected(fmt::format(
"Security error: Cannot delete base directory {}", base_path_));
}

// Security check passed, execute delete operation
if (fs::is_directory(full_path)) {
auto removed_count = fs::remove_all(full_path, ec);
if (ec) {
return tl::make_unexpected(
fmt::format("Failed to remove directory {}: {}", full_path,
ec.message()));
}
VLOG(1) << "Removed directory: " << full_path
<< ", items removed: " << removed_count;
} else {
// If it's a file, delete all files matching the prefix
fs::path parent_dir = fs::path(full_path).parent_path();
std::string prefix_name = fs::path(full_path).filename().string();

// Verify parent directory is also within base_path_
fs::path canonical_parent = fs::canonical(parent_dir, ec);
if (ec || canonical_parent.string().substr(0, base_str.length()) !=
base_str) {
return tl::make_unexpected(fmt::format(
"Security error: Parent path {} is outside base directory",
parent_dir.string()));
}

if (fs::exists(parent_dir) && fs::is_directory(parent_dir)) {
for (const auto& entry : fs::directory_iterator(parent_dir)) {
std::string filename = entry.path().filename().string();
if (filename.find(prefix_name) == 0) {
fs::remove_all(entry.path(), ec);
if (ec) {
LOG(WARNING) << "Failed to remove: " << entry.path()
<< ", error: " << ec.message();
}
}
}
}
}

return {};
} catch (const fs::filesystem_error& e) {
return tl::make_unexpected(
fmt::format("Filesystem error during delete: {}", e.what()));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The LocalFileBackend::DeleteObjectsWithPrefix method includes robust security checks (fs::canonical, path prefix comparison) to prevent accidental or malicious deletion of files outside the designated base_path_ or the base_path_ itself. This is a critical security measure for file system operations.

return tl::make_unexpected(ErrorCode::INVALID_PARAMS);
}

std::shared_lock<std::shared_mutex> shared_lock(SNAPSHOT_MUTEX);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Acquiring a std::shared_lock on SNAPSHOT_MUTEX for RemoveByRegex is important to ensure that the metadata being iterated over for regex matching and removal is consistent during a snapshot.


tl::expected<void, ErrorCode> MasterService::MoveRevoke(
const UUID& client_id, const std::string& key) {
std::shared_lock<std::shared_mutex> shared_lock(SNAPSHOT_MUTEX);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Acquiring a std::shared_lock on SNAPSHOT_MUTEX for MoveRevoke is important to ensure that the replica removal and replication task removal are consistent with any ongoing snapshot operations.

-> tl::expected<void, ErrorCode> {
size_t metrics_dec_capacity = 0; // to update the metrics

std::shared_lock<std::shared_mutex> shared_lock(SNAPSHOT_MUTEX);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Acquiring a std::shared_lock on SNAPSHOT_MUTEX for UnmountSegment is necessary to prevent race conditions with snapshot operations that might be trying to read the segment state.

Comment on lines +375 to +508
tl::expected<std::vector<uint8_t>, SerializationError>
SegmentSerializer::Serialize() {
if (!segment_manager_) {
return tl::unexpected(SerializationError(
ErrorCode::SERIALIZE_FAIL,
"serialize SegmentManager segment_manager_ is null"));
}

if (segment_manager_->memory_allocator_ != BufferAllocatorType::OFFSET) {
return tl::unexpected(SerializationError(
ErrorCode::SERIALIZE_UNSUPPORTED,
"serialize SegmentManager memory_allocator_ is not offset"));
}

msgpack::sbuffer sbuf;
msgpack::packer<msgpack::sbuffer> packer(&sbuf);

// Create map containing all data
packer.pack_map(
5); // 5 fields: memory_allocator, mounted_segments, client_segments,
// allocator_names, local_disk_segments

// Serialize memory_allocator_
packer.pack("ma");
packer.pack(static_cast<int32_t>(segment_manager_->memory_allocator_));

// Serialize allocator_manager_'s names_ order
packer.pack("an"); // allocator_names
const auto& names = segment_manager_->allocator_manager_.getNames();
packer.pack_array(names.size());
for (const auto& name : names) {
packer.pack(name);
}

// Serialize mounted_segments_
// Sort UUIDs first to ensure deterministic serialization results
packer.pack("ms");
packer.pack_map(segment_manager_->mounted_segments_.size());

// Collect all segment UUIDs and sort
std::vector<UUID> sorted_segment_uuids;
sorted_segment_uuids.reserve(segment_manager_->mounted_segments_.size());
for (const auto& pair : segment_manager_->mounted_segments_) {
sorted_segment_uuids.push_back(pair.first);
}
std::sort(sorted_segment_uuids.begin(), sorted_segment_uuids.end());

for (const auto& segment_uuid : sorted_segment_uuids) {
const auto& mounted_segment =
segment_manager_->mounted_segments_.at(segment_uuid);
std::string uuid_str = UuidToString(segment_uuid);
packer.pack(uuid_str);

// Serialize MountedSegment object
auto result =
Serializer<MountedSegment>::serialize(mounted_segment, packer);
if (!result) {
return tl::unexpected(result.error());
}
}

// Serialize client_segments_
// Sort client UUIDs first to ensure deterministic serialization results
packer.pack("cs");
packer.pack_map(segment_manager_->client_segments_.size());

// Collect all client UUIDs and sort
std::vector<UUID> sorted_client_uuids;
sorted_client_uuids.reserve(segment_manager_->client_segments_.size());
for (const auto& pair : segment_manager_->client_segments_) {
sorted_client_uuids.push_back(pair.first);
}
std::sort(sorted_client_uuids.begin(), sorted_client_uuids.end());

for (const auto& client_uuid : sorted_client_uuids) {
const auto& segment_ids =
segment_manager_->client_segments_.at(client_uuid);
std::string client_uuid_str = UuidToString(client_uuid);
packer.pack(client_uuid_str);

// Serialize segment IDs array
packer.pack_array(segment_ids.size());

for (const auto& segment_id : segment_ids) {
std::string segment_uuid_str = UuidToString(segment_id);
packer.pack(segment_uuid_str);
}
}

// Serialize client_local_disk_segment_
// Sort client UUIDs first to ensure deterministic serialization results
packer.pack("ld"); // local_disk_segments
packer.pack_map(segment_manager_->client_local_disk_segment_.size());

// Collect all client UUIDs and sort
std::vector<UUID> sorted_ld_uuids;
sorted_ld_uuids.reserve(
segment_manager_->client_local_disk_segment_.size());
for (const auto& pair : segment_manager_->client_local_disk_segment_) {
sorted_ld_uuids.push_back(pair.first);
}
std::sort(sorted_ld_uuids.begin(), sorted_ld_uuids.end());

for (const auto& client_uuid : sorted_ld_uuids) {
const auto& segment =
segment_manager_->client_local_disk_segment_.at(client_uuid);
packer.pack(UuidToString(client_uuid));

// Serialize LocalDiskSegment: [enable_offloading, count, key1, ts1,
// key2, ts2, ...] Sort keys to ensure determinism
std::vector<std::string> sorted_keys;
for (const auto& [key, ts] : segment->offloading_objects) {
sorted_keys.push_back(key);
}
std::sort(sorted_keys.begin(), sorted_keys.end());

packer.pack_array(2 + sorted_keys.size() * 2);
packer.pack(segment->enable_offloading);
packer.pack(static_cast<uint64_t>(sorted_keys.size()));

for (const auto& key : sorted_keys) {
packer.pack(key);
packer.pack(segment->offloading_objects.at(key));
}
}

// Compress entire data
std::vector<uint8_t> compressed_data = zstd_compress(
reinterpret_cast<const uint8_t*>(sbuf.data()), sbuf.size(), 3);

// Return compressed data
return std::vector<uint8_t>(
std::make_move_iterator(compressed_data.begin()),
std::make_move_iterator(compressed_data.end()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The SegmentSerializer::Serialize method uses MessagePack with ZSTD compression for the entire segment manager state. This is a good approach for efficiency. The serialization logic sorts UUIDs and keys to ensure deterministic output, which is vital for consistent snapshots and reliable comparisons.

Comment on lines +511 to +848
tl::expected<void, SerializationError> SegmentSerializer::Deserialize(
const std::vector<uint8_t>& data) {
// Decompress data
std::vector<uint8_t> decompressed_data;
try {
decompressed_data = zstd_decompress(
reinterpret_cast<const uint8_t*>(data.data()), data.size());
} catch (const std::exception& e) {
return tl::make_unexpected(
SerializationError(ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager failed to "
"decompress MessagePack data: " +
std::string(e.what())));
}

// Parse MessagePack data
msgpack::object_handle oh;
try {
oh = msgpack::unpack(
reinterpret_cast<const char*>(decompressed_data.data()),
decompressed_data.size());
} catch (const std::exception& e) {
return tl::make_unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager failed to unpack MessagePack data: " +
std::string(e.what())));
}

const msgpack::object& obj = oh.get();

// Check if it's a map
if (obj.type != msgpack::type::MAP) {
return tl::unexpected(
SerializationError(ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager invalid MessagePack "
"format: expected map"));
}

if (!segment_manager_) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager segment_manager is null"));
}

// Clear existing data
segment_manager_->mounted_segments_.clear();
segment_manager_->client_segments_.clear();

// Convert MessagePack map to regular map, use pointers for values to avoid
// copying
std::map<std::string, const msgpack::object*> fields_map;
for (uint32_t i = 0; i < obj.via.map.size; ++i) {
const msgpack::object& key_obj = obj.via.map.ptr[i].key;
const msgpack::object& value_obj = obj.via.map.ptr[i].val;

// Ensure key is a string
if (key_obj.type == msgpack::type::STR) {
std::string key(key_obj.via.str.ptr, key_obj.via.str.size);
fields_map.emplace(std::move(key), &value_obj);
}
}

// Process fields in order
// 1. First process memory_allocator_
auto allocator_it = fields_map.find("ma");
if (allocator_it != fields_map.end()) {
const msgpack::object* value_obj = allocator_it->second;
if (value_obj->type != msgpack::type::POSITIVE_INTEGER) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager memory_allocator is not int"));
}

auto allocatorType =
static_cast<BufferAllocatorType>(value_obj->as<int32_t>());

// Note: Type must match, only OffsetAllocator is supported
if (allocatorType != segment_manager_->memory_allocator_) {
LOG(ERROR) << "deserialize memory allocator type doesn't match "
"current setting";
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager memory allocator type doesn't "
"match current setting"));
}
}

// 1.5 Parse allocator_names (saved original names_ order)
std::vector<std::string> saved_allocator_names;
auto allocator_names_it = fields_map.find("an");
if (allocator_names_it != fields_map.end()) {
const msgpack::object* value_obj = allocator_names_it->second;
if (value_obj->type != msgpack::type::ARRAY) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager allocator_names is not array"));
}

saved_allocator_names.reserve(value_obj->via.array.size);
for (uint32_t i = 0; i < value_obj->via.array.size; ++i) {
const msgpack::object& name_obj = value_obj->via.array.ptr[i];
if (name_obj.type != msgpack::type::STR) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager allocator_name is not string"));
}
saved_allocator_names.emplace_back(name_obj.via.str.ptr,
name_obj.via.str.size);
}
}

// 2. Process mounted_segments_
auto mounted_segments_it = fields_map.find("ms");
if (mounted_segments_it != fields_map.end()) {
const msgpack::object* value_obj = mounted_segments_it->second;
if (value_obj->type != msgpack::type::MAP) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager mounted_segments is not map"));
}

// Iterate through all mounted segments
for (uint32_t j = 0; j < value_obj->via.map.size; ++j) {
const msgpack::object& segment_key = value_obj->via.map.ptr[j].key;
const msgpack::object& segment_value =
value_obj->via.map.ptr[j].val;

// Ensure key is a string (UUID)
if (segment_key.type != msgpack::type::STR) {
return tl::unexpected(
SerializationError(ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager "
"mounted_segments key is not string"));
}

std::string uuid_str(segment_key.via.str.ptr,
segment_key.via.str.size);
UUID segment_uuid;
if (!StringToUuid(uuid_str, segment_uuid)) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize SegmentManager mounted_segments "
"uuid {} is invalid",
uuid_str)));
}

// Deserialize MountedSegment object
auto result =
Serializer<MountedSegment>::deserialize(segment_value);
if (!result) {
return tl::unexpected(result.error());
}

segment_manager_->mounted_segments_.emplace(
segment_uuid, std::move(result.value()));
}
}

// 3. Process client_segments_
auto client_segments_it = fields_map.find("cs");
if (client_segments_it != fields_map.end()) {
const msgpack::object* value_obj = client_segments_it->second;
if (value_obj->type != msgpack::type::MAP) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager client_segments is not map"));
}

// Iterate through all client segments
for (uint32_t j = 0; j < value_obj->via.map.size; ++j) {
const msgpack::object& client_key = value_obj->via.map.ptr[j].key;
const msgpack::object& client_value = value_obj->via.map.ptr[j].val;

// Ensure key is a string (UUID)
if (client_key.type != msgpack::type::STR) {
return tl::unexpected(
SerializationError(ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager "
"client_segments key is not string"));
}

std::string client_uuid_str(client_key.via.str.ptr,
client_key.via.str.size);
UUID client_uuid;
if (!StringToUuid(client_uuid_str, client_uuid)) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize SegmentManager client_segments "
"client uuid {} is invalid",
client_uuid_str)));
}

// Ensure value is an array
if (client_value.type != msgpack::type::ARRAY) {
return tl::unexpected(
SerializationError(ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager "
"client_segments value is not array"));
}

// Deserialize segment IDs array
std::vector<UUID> segment_ids;
segment_ids.reserve(client_value.via.array.size);

for (uint32_t k = 0; k < client_value.via.array.size; ++k) {
const msgpack::object& segment_id_obj =
client_value.via.array.ptr[k];

if (segment_id_obj.type != msgpack::type::STR) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager segment_id is not string"));
}

std::string segment_uuid_str(segment_id_obj.via.str.ptr,
segment_id_obj.via.str.size);
UUID segment_uuid;
if (!StringToUuid(segment_uuid_str, segment_uuid)) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize SegmentManager segment_id {} "
"is invalid",
segment_uuid_str)));
}

segment_ids.push_back(segment_uuid);
}

segment_manager_->client_segments_.emplace(client_uuid,
std::move(segment_ids));
}
}

// Restore allocator_manager_ based on mounted_segments_ and saved names
// order
segment_manager_->allocator_manager_ = AllocatorManager();

// Add allocators in saved original order
for (const auto& name : saved_allocator_names) {
for (auto& pair : segment_manager_->mounted_segments_) {
MountedSegment& mounted_segment = pair.second;
if (mounted_segment.segment.name == name &&
mounted_segment.status == SegmentStatus::OK &&
mounted_segment.buf_allocator) {
segment_manager_->allocator_manager_.addAllocator(
name, mounted_segment.buf_allocator);
break;
}
}
}

// Rebuild client_by_name_ based on client_segments_ and mounted_segments_
segment_manager_->client_by_name_.clear();
for (const auto& [client_id, segment_ids] :
segment_manager_->client_segments_) {
for (const auto& segment_id : segment_ids) {
auto it = segment_manager_->mounted_segments_.find(segment_id);
if (it != segment_manager_->mounted_segments_.end()) {
segment_manager_->client_by_name_[it->second.segment.name] =
client_id;
}
}
}

// 4. Process client_local_disk_segment_
segment_manager_->client_local_disk_segment_.clear();
auto ld_it = fields_map.find("ld");
if (ld_it != fields_map.end()) {
const msgpack::object* ld_obj = ld_it->second;
if (ld_obj->type != msgpack::type::MAP) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
"deserialize SegmentManager local_disk_segments is not map"));
}

for (uint32_t j = 0; j < ld_obj->via.map.size; ++j) {
const msgpack::object& client_key = ld_obj->via.map.ptr[j].key;
const msgpack::object& client_value = ld_obj->via.map.ptr[j].val;

// Parse client_id
if (client_key.type != msgpack::type::STR) {
return tl::unexpected(
SerializationError(ErrorCode::DESERIALIZE_FAIL,
"deserialize local_disk_segments "
"client key is not string"));
}

std::string client_uuid_str(client_key.via.str.ptr,
client_key.via.str.size);
UUID client_id;
if (!StringToUuid(client_uuid_str, client_id)) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize local_disk_segments "
"client uuid {} is invalid",
client_uuid_str)));
}

// Parse LocalDiskSegment array: [enable_offloading, count, key1,
// ts1, ...]
if (client_value.type != msgpack::type::ARRAY ||
client_value.via.array.size < 2) {
return tl::unexpected(
SerializationError(ErrorCode::DESERIALIZE_FAIL,
"deserialize local_disk_segments "
"value is not valid array"));
}

bool enable_offloading = client_value.via.array.ptr[0].as<bool>();
uint64_t count = client_value.via.array.ptr[1].as<uint64_t>();

auto segment =
std::make_shared<LocalDiskSegment>(enable_offloading);

// Parse offloading_objects
for (uint64_t k = 0; k < count; ++k) {
size_t key_idx = 2 + k * 2;
size_t ts_idx = 2 + k * 2 + 1;
if (ts_idx >= client_value.via.array.size) {
return tl::unexpected(
SerializationError(ErrorCode::DESERIALIZE_FAIL,
"deserialize local_disk_segments "
"offloading_objects out of bounds"));
}

std::string key(
client_value.via.array.ptr[key_idx].via.str.ptr,
client_value.via.array.ptr[key_idx].via.str.size);
int64_t ts = client_value.via.array.ptr[ts_idx].as<int64_t>();
segment->offloading_objects[key] = ts;
}

segment_manager_->client_local_disk_segment_[client_id] =
std::move(segment);
}
}

return {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The SegmentSerializer::Deserialize method correctly handles decompression and MessagePack parsing. The reconstruction of allocator_manager_ and client_by_name_ based on the deserialized mounted_segments_ and client_segments_ is complex but necessary. The validation checks for MessagePack types and array sizes are good for robustness.

Comment on lines 13 to 86
tl::expected<void, SerializationError>
Serializer<offset_allocator::__Allocator>::serialize(
const offset_allocator::__Allocator &allocator, MsgpackPacker &packer) {
// Use array instead of map for more compact storage
// Array order is consistent with deserialize_msgpack
packer.pack_array(10);

// Basic properties (packed in order)
packer.pack(allocator.m_size);
packer.pack(allocator.m_current_capacity);
packer.pack(allocator.m_max_capacity);
packer.pack(allocator.m_freeStorage);
packer.pack(allocator.m_usedBinsTop);

// usedBins array
packer.pack_array(offset_allocator::NUM_TOP_BINS);
for (unsigned char m_usedBin : allocator.m_usedBins) {
packer.pack(m_usedBin);
}

// binIndex array
packer.pack_array(offset_allocator::NUM_LEAF_BINS);
for (unsigned int m_binIndex : allocator.m_binIndices) {
packer.pack(m_binIndex);
}

// nodes data serialization and compression
std::vector<uint8_t> serialized_nodes;
serialized_nodes.reserve(allocator.m_max_capacity * 25);

for (uint32_t i = 0; i < allocator.m_current_capacity; i++) {
const auto &node = allocator.m_nodes[i];
serialized_nodes.push_back(node.used ? 1 : 0);
SerializationHelper::serializeUint32(node.dataOffset, serialized_nodes);
SerializationHelper::serializeUint32(node.dataSize, serialized_nodes);
SerializationHelper::serializeUint32(node.binListPrev,
serialized_nodes);
SerializationHelper::serializeUint32(node.binListNext,
serialized_nodes);
SerializationHelper::serializeUint32(node.neighborPrev,
serialized_nodes);
SerializationHelper::serializeUint32(node.neighborNext,
serialized_nodes);
}

try {
std::vector<uint8_t> compressed_nodes =
zstd_compress(serialized_nodes, 3);
packer.pack(compressed_nodes);

// freeNodes data serialization and compression
std::vector<uint8_t> serialized_free_nodes;
serialized_free_nodes.reserve(allocator.m_current_capacity * 4);

for (uint32_t i = 0; i < allocator.m_current_capacity; i++) {
SerializationHelper::serializeUint32(allocator.m_freeNodes[i],
serialized_free_nodes);
}

std::vector<uint8_t> compressed_free_nodes =
zstd_compress(serialized_free_nodes, 3);
packer.pack(compressed_free_nodes);
} catch (const std::exception &e) {
return tl::unexpected(SerializationError(
ErrorCode::SERIALIZE_FAIL,
std::string(
"offset_allocator::__Allocator, error compressing nodes: ") +
e.what()));
}

// freeOffset
packer.pack(allocator.m_freeOffset);

return {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Serializer<offset_allocator::__Allocator>::serialize method uses a custom binary format within MessagePack by manually serializing Node and NodeIndex fields using SerializationHelper::serializeUint32 and then compressing the raw byte vectors with ZSTD. While this might be efficient, it makes the serialization format tightly coupled to the Node struct's internal layout and the SerializationHelper's implementation. This approach is more brittle to changes and less readable than letting MessagePack handle more structured data directly (e.g., packing Node objects as MessagePack arrays or maps).

Comment on lines 90 to 318
tl::expected<std::unique_ptr<offset_allocator::__Allocator>, SerializationError>
Serializer<offset_allocator::__Allocator>::deserialize(
const msgpack::object &obj) {
// Check if object type is array (consistent with serialize_msgpack)
if (obj.type != msgpack::type::ARRAY) {
return tl::unexpected(
SerializationError(ErrorCode::DESERIALIZE_FAIL,
"deserialize offset_allocator::__Allocator "
"invalid msgpack data, expected array"));
}

// Verify array size is correct
if (obj.via.array.size != 10) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize offset_allocator::__Allocator invalid "
"array size: expected 9, got {}",
obj.via.array.size)));
}

auto *array_items = obj.via.array.ptr;
size_t index = 0;

// Deserialize basic properties
uint32_t size = array_items[index++].as<uint32_t>();
uint32_t current_capacity = array_items[index++].as<uint32_t>();
uint32_t max_capacity = array_items[index++].as<uint32_t>();

// Create allocator object
auto allocator = std::make_unique<offset_allocator::__Allocator>(
size, current_capacity, max_capacity);

allocator->m_freeStorage = array_items[index++].as<uint32_t>();
allocator->m_usedBinsTop = array_items[index++].as<uint32_t>();

// Deserialize usedBins array
const auto &used_bins_array = array_items[index++];
if (used_bins_array.type != msgpack::type::ARRAY) {
return tl::unexpected(
SerializationError(ErrorCode::DESERIALIZE_FAIL,
"deserialize offset_allocator::__Allocator "
"usedBins is not an array"));
}

if (used_bins_array.via.array.size != offset_allocator::NUM_TOP_BINS) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize offset_allocator::__Allocator usedBins "
"invalid size: expected {}, got {}",
offset_allocator::NUM_TOP_BINS,
used_bins_array.via.array.size)));
}

for (uint32_t i = 0; i < used_bins_array.via.array.size &&
i < offset_allocator::NUM_TOP_BINS;
i++) {
allocator->m_usedBins[i] =
used_bins_array.via.array.ptr[i].as<uint8_t>();
}

// Deserialize binIndices array
const auto &bin_indices_array = array_items[index++];
if (bin_indices_array.type != msgpack::type::ARRAY) {
return tl::unexpected(
SerializationError(ErrorCode::DESERIALIZE_FAIL,
"deserialize offset_allocator::__Allocator "
"binIndices is not an array"));
}

if (bin_indices_array.via.array.size != offset_allocator::NUM_LEAF_BINS) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize offset_allocator::__Allocator binIndices "
"invalid size: expected {}, got {}",
offset_allocator::NUM_LEAF_BINS,
bin_indices_array.via.array.size)));
}

for (uint32_t i = 0; i < bin_indices_array.via.array.size &&
i < offset_allocator::NUM_LEAF_BINS;
i++) {
allocator->m_binIndices[i] =
bin_indices_array.via.array.ptr[i].as<uint32_t>();
}

try {
// Deserialize compressed nodes data
const auto &nodes_bin = array_items[index++];
if (nodes_bin.type != msgpack::type::BIN) {
return tl::unexpected(
SerializationError(ErrorCode::DESERIALIZE_FAIL,
"deserialize offset_allocator::__Allocator "
"nodes data is not binary"));
}

// Create copy of compressed data
std::vector<uint8_t> compressed_data(
reinterpret_cast<const uint8_t *>(nodes_bin.via.bin.ptr),
reinterpret_cast<const uint8_t *>(nodes_bin.via.bin.ptr) +
nodes_bin.via.bin.size);

// Decompress data
std::vector<uint8_t> serialized_nodes =
zstd_decompress(compressed_data);

// Verify decompressed data size is reasonable
if (serialized_nodes.size() % 25 != 0) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize offset_allocator::__Allocator invalid "
"serialized nodes data size: expected multiple of "
"25, actual size: {}",
serialized_nodes.size())));
}

if (serialized_nodes.size() / 25 != current_capacity) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize offset_allocator::__Allocator invalid "
"serialized nodes data size: expected quotient of "
"{}, actual quotient: {}",
current_capacity, serialized_nodes.size() / 25)));
}

// Deserialize nodes array in standardized format
size_t offset = 0;
for (uint32_t i = 0; i < current_capacity; i++) {
if (offset + 25 > serialized_nodes.size()) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize offset_allocator::__Allocator "
"incomplete serialized nodes data at index {}",
i)));
}

// Deserialize bool field
allocator->m_nodes[i].used = (serialized_nodes[offset++] != 0);

// Deserialize uint32_t field
allocator->m_nodes[i].dataOffset =
SerializationHelper::deserializeUint32(
&serialized_nodes[offset]);
offset += 4;
allocator->m_nodes[i].dataSize =
SerializationHelper::deserializeUint32(
&serialized_nodes[offset]);
offset += 4;
allocator->m_nodes[i].binListPrev =
SerializationHelper::deserializeUint32(
&serialized_nodes[offset]);
offset += 4;
allocator->m_nodes[i].binListNext =
SerializationHelper::deserializeUint32(
&serialized_nodes[offset]);
offset += 4;
allocator->m_nodes[i].neighborPrev =
SerializationHelper::deserializeUint32(
&serialized_nodes[offset]);
offset += 4;
allocator->m_nodes[i].neighborNext =
SerializationHelper::deserializeUint32(
&serialized_nodes[offset]);
offset += 4;
}
} catch (const std::exception &e) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize offset_allocator::__Allocator error "
"decompressing nodes: {}",
e.what())));
}

try {
// Deserialize compressed freeNodes data
const auto &free_nodes_bin = array_items[index++];
if (free_nodes_bin.type != msgpack::type::BIN) {
return tl::unexpected(
SerializationError(ErrorCode::DESERIALIZE_FAIL,
"deserialize offset_allocator::__Allocator "
"freeNodes data is not binary"));
}

// Create copy of compressed data
std::vector<uint8_t> compressed_data(
reinterpret_cast<const uint8_t *>(free_nodes_bin.via.bin.ptr),
reinterpret_cast<const uint8_t *>(free_nodes_bin.via.bin.ptr) +
free_nodes_bin.via.bin.size);

// Decompress data
std::vector<uint8_t> serialized_free_nodes =
zstd_decompress(compressed_data);

// Verify decompressed data size is reasonable
if (serialized_free_nodes.size() != current_capacity * 4) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format(
"deserialize offset_allocator::__Allocator invalid "
"serialized free nodes data size: expected {}, actual {}",
current_capacity * 4, serialized_free_nodes.size())));
}

// Deserialize freeNodes array in standardized format
size_t offset = 0;
for (uint32_t i = 0; i < current_capacity; i++) {
if (offset + 4 > serialized_free_nodes.size()) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format(
"deserialize offset_allocator::__Allocator incomplete "
"serialized free nodes data at index {}",
i)));
}
allocator->m_freeNodes[i] = SerializationHelper::deserializeUint32(
&serialized_free_nodes[offset]);
offset += 4;
}
} catch (const std::exception &e) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize offset_allocator::__Allocator error "
"processing free nodes: {}",
e.what())));
}

// Deserialize freeOffset
allocator->m_freeOffset = array_items[index++].as<uint32_t>();

return allocator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Serializer<offset_allocator::__Allocator>::deserialize method manually deserializes the custom binary format for Node and NodeIndex fields after ZSTD decompression. This requires strict adherence to the serialization order and byte-level interpretation. Any mismatch between serialization and deserialization logic could lead to data corruption. Consider adding a version field to the serialized data to handle schema evolution more gracefully.


using UUID = std::pair<uint64_t, uint64_t>;

inline std::shared_mutex SNAPSHOT_MUTEX; // Snapshot mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The introduction of a global SNAPSHOT_MUTEX is a significant design decision. While it's used to protect the state during snapshot operations, global mutable state can increase complexity in large codebases and make reasoning about thread safety more challenging. Consider encapsulating this mutex within the MasterService class or a dedicated snapshot manager if possible, to limit its scope and improve modularity.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive snapshot and restore mechanism for the Master Service, a critical feature for reducing cache warm-up time and improving service availability. However, several critical security and stability issues have been identified. These include a potential deadlock in the snapshot child process, a path traversal vulnerability in the local file storage backend, and potential Denial of Service via unbounded memory allocation during decompression and S3 downloads, primarily stemming from the use of fork() in a multi-threaded environment and insufficient data validation. Additionally, a potential issue with a global mutex for snapshotting, memory optimization for large S3 uploads, and minor inconsistencies in the serialization code were noted.

std::unique_lock<std::shared_mutex> lock(SNAPSHOT_MUTEX);
LOG(INFO) << "[Snapshot] Locking snapshot mutex, snapshot_id="
<< snapshot_id;
pid = fork();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The fork() call here is performed while holding an exclusive lock on SNAPSHOT_MUTEX. While this ensures consistency for metadata protected by this mutex, the child process subsequently attempts to serialize the TaskManager state in PersistState, which involves acquiring task_manager_.mutex_. Several methods in MasterService (such as FetchTasks, MarkTaskToComplete, and QueryTask) and the TaskCleanupThreadFunc acquire task_manager_.mutex_ without holding SNAPSHOT_MUTEX. If any of these threads hold the lock during fork(), the child process will deadlock when it attempts to acquire it during serialization.

To fix this, ensure that all code paths acquiring task_manager_.mutex_ also acquire a shared lock on SNAPSHOT_MUTEX to prevent forking while the lock is held.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To implement the fix with minimal impact, should we apply std::shared_lockstd::shared_mutex at the entries of FetchTasks, MarkTaskToComplete, QueryTask, and TaskCleanupThreadFunc? This ensures that fork() (holding the unique lock) waits for all task_manager_.mutex_ holders to finish, ensuring a clean state in the child process."

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryTask doesn't need a snapshot lock because it's a read-only function. Locking of SNAPSHOT_MUTEX has been added elsewhere.

Comment on lines +120 to +123
std::string LocalFileBackend::KeyToPath(const std::string& key) const {
// key format: "master_snapshot/20231201_123456_000/metadata"
// converts to: "/base_path/master_snapshot/20231201_123456_000/metadata"
return base_path_ + "/" + key;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The LocalFileBackend implementation lacks path traversal protection in its UploadBuffer, DownloadBuffer, UploadString, and DownloadString methods. The KeyToPath method simply concatenates the base_path_ with the provided key. Since MasterService::RestoreState uses the content of latest.txt from the backend as a state_id to construct keys, a compromised backend could lead to arbitrary file read/write on the host system by injecting .. sequences or null bytes into latest.txt.

Implement strict validation for all keys and ensure the resolved canonical path remains within base_path_, similar to the check already present in DeleteObjectsWithPrefix.


using UUID = std::pair<uint64_t, uint64_t>;

inline std::shared_mutex SNAPSHOT_MUTEX; // Snapshot mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The SNAPSHOT_MUTEX is defined as a global inline variable. This can lead to contention and incorrect behavior if multiple MasterService instances are created within the same process (e.g., in tests), as they would all share the same lock. To improve encapsulation and prevent potential race conditions, this mutex should be a member of the MasterService class.

To fix this, you can add a mutable std::shared_mutex snapshot_mutex_; member to MasterService and replace all uses of the global SNAPSHOT_MUTEX with this new member.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to Gemini's suggestion. Though running multiple MasterService instances in one process is rare in production, making SNAPSHOT_MUTEX a member adds negligible overhead (just one std::shared_mutex per instance.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied!Thanks for catching this.

Comment on lines +3155 to +3157
auto decompressed_data = zstd_decompress(
reinterpret_cast<const uint8_t*>(shard_data_obj.via.bin.ptr),
shard_data_obj.via.bin.size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The MetadataSerializer::Deserialize method calls zstd_decompress without specifying a max_decompressed_size. This allows a malicious snapshot to specify an extremely large decompressed size in the zstd header, causing the Master Service to attempt a massive memory allocation and crash (OOM). A similar issue exists in SegmentSerializer::Deserialize in mooncake-store/src/segment.cpp.

Always use the zstd_decompress overload that accepts a max_decompressed_size parameter and enforce a reasonable limit (e.g., 1GB).

fmt::format("Invalid content length received: {}", size));
}

buffer.resize(static_cast<size_t>(size));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The S3Helper::DownloadBuffer and DownloadBufferMultipart methods resize the output buffer based on the Content-Length from S3 without any validation. A malicious or compromised S3 backend could return an extremely large Content-Length, leading to an OOM crash during the resize() call.

Implement a maximum allowed size for S3 downloads and verify the Content-Length against this limit before allocating memory.

Comment on lines +1927 to +1977
// Use non-blocking polling to wait
while (true) {
// Read child logs first
flush_child_logs();

int status;
pid_t result = waitpid(pid, &status, WNOHANG);

if (result == -1) {
LOG(ERROR) << "[Snapshot] Failed to wait for child process: "
<< strerror(errno) << ", snapshot_id=" << snapshot_id
<< ", child_pid=" << pid;
MasterMetricManager::instance().inc_snapshot_fail();
return;
} else if (result == 0) {
// Child process is still running
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start_time)
.count();

if (elapsed >= timeout_seconds) {
// Timeout handling - flush remaining logs before killing
flush_child_logs();
if (!log_buffer.empty()) {
LOG(INFO) << "[Snapshot:Child] " << log_buffer;
}
HandleChildTimeout(pid, snapshot_id);
MasterMetricManager::instance().inc_snapshot_fail();
return;
}

// Brief sleep before checking again
std::this_thread::sleep_for(std::chrono::seconds(2));
} else {
// Child process has exited
// Flush remaining logs from child
flush_child_logs();
// Output any remaining incomplete line
if (!log_buffer.empty()) {
LOG(INFO) << "[Snapshot:Child] " << log_buffer;
}

HandleChildExit(pid, status, snapshot_id);
auto elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start_time)
.count();
MasterMetricManager::instance().set_snapshot_duration_ms(elapsed);
return;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The waitpid with WNOHANG in a loop creates a busy-wait pattern with a fixed 2-second sleep. This is inefficient and can introduce delays in detecting child process termination. A more efficient approach would be to use select() or poll() to wait on both the pipe file descriptor for logs and the child process status. However, a simpler improvement is to use waitpid in a blocking manner on a separate thread, which would be more responsive and resource-friendly than polling.

Comment on lines 103 to 107
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize offset_allocator::__Allocator invalid "
"array size: expected 9, got {}",
obj.via.array.size)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's a minor typo in the error message. The check is for an array size of 10, but the message says "expected 9". This should be corrected to avoid confusion during debugging.

        fmt::format("deserialize offset_allocator::__Allocator invalid "
                    "array size: expected 10, got {}",
                    obj.via.array.size)));

Comment on lines +860 to +864
if (obj.via.array.size < 8) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
"deserialize MountedSegment invalid array size"));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The array size check obj.via.array.size < 8 is not strict enough. The serialization logic in Serializer<MountedSegment>::serialize always packs exactly 8 elements. For better data validation, it's recommended to check for exact equality using obj.via.array.size != 8.

    if (obj.via.array.size != 8) {
        return tl::unexpected(SerializationError(
            ErrorCode::DESERIALIZE_FAIL,
            "deserialize MountedSegment invalid array size"));
    }

Comment on lines +544 to +549
// Use temporary stream
auto stream =
Aws::MakeShared<Aws::StringStream>("UploadPart");
stream->write(reinterpret_cast<const char *>(part_data),
current_part_size);
part_request.SetBody(stream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of UploadBufferMultipart creates an Aws::StringStream for each part, which involves copying the part data into a new in-memory buffer before uploading. For large files, this can double the memory consumption during the upload process.

To optimize this, you could implement a custom Aws::IOStream that reads directly from the source buffer slice (part_data) without making an intermediate copy. This can be achieved by creating a custom std::streambuf that wraps the existing memory buffer.

@00fish0
Copy link
Collaborator

00fish0 commented Feb 8, 2026

  1. The serialization part is a bit hard to follow, and there’s some repeated encode/decode patterns that could be extracted into small helpers. That makes long‑term maintenance harder. Not blocking this PR, but it might be worth a follow‑up refactor PR to clarify the structure and centralize common serialization checks/packing.

  2. The snapshot logic around copy/move semantics may need further refinement—especially concerning what data is persisted in snapshots, and how this will integrate with the oplog going forward.

Copy link
Collaborator

@00fish0 00fish0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution! I've left some comments below.


// nodes data serialization and compression
std::vector<uint8_t> serialized_nodes;
serialized_nodes.reserve(allocator.m_max_capacity * 25);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

25 is a magic number here. Maybe 1 + 6 * sizeof(uint32_t) is better for clarity.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed! Thanks for catching this.

Comment on lines +2344 to +2352
// 2. Download manifest.txt to parse protocol version info
std::string manifest_path = path_prefix + SNAPSHOT_MANIFEST_FILE;
std::string manifest_content;
if (!snapshot_backend_->DownloadString(manifest_path,
manifest_content)) {
LOG(ERROR) << "[Restore] Failed to download manifest file: "
<< manifest_path << " , starting fresh";
return;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Manifest parsing only logs serializer_type and version but doesn’t enforce compatibility. If field order/size changes, old snapshots may deserialize into corrupted state with unclear errors.
Could we add a strict check and fail fast on mismatch?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!Currently, the serializer_type and version are validated at the start of the restore process.


// Decompress data
std::vector<uint8_t> serialized_nodes =
zstd_decompress(compressed_data);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several deserialization paths in this file call zstd_decompress without a max output size.
There’s already a size-capped overload in zstd_util.h; can we use it here and define reasonable max sizes?

return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize offset_allocator::__Allocator invalid "
"array size: expected 9, got {}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: The error message says 'expected 9'. Could we update the error message?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed! Thanks for catching this.

"content={}",
latest_path, snapshot_id, latest_content);

CleanupOldSnapshot(10, snapshot_id);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some thoughts regarding the snapshot retention policy. While keeping 10 snapshots is likely too many and will put significant pressure on etcd's storage space, I'm concerned that keeping only one might be risky due to:

Partial Writes: If a crash occurs during the snapshotting process, we might end up with a corrupted file.

Oplog Desynchronization: If the oplog isn't perfectly synced with the snapshot, we might need to replay from a previous, stable state.

What is the optimal number here? Perhaps 2 or 3 would be a better balance between safety and etcd's capacity constraints?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new configuration option has been added: snapshot_retention_count, which sets the number of old snapshots to retain (default is 2).

// Child process
// Close read end, set write end for logging
close(log_pipe[0]);
g_snapshot_log_pipe_fd = log_pipe[1];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If logs are large, the pipe can fill up and the child will block in write(), which can trigger a false timeout and kill.
Should we set the child’s pipe fd to non‑blocking (drop logs on EAGAIN) or otherwise avoid blocking writes?

Comment on lines +3207 to +3210
void MasterService::MetadataSerializer::Reset() {
for (auto& shard : service_->metadata_shards_) {
shard.metadata.clear();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetadataSerializer::Reset() only clears the metadata map. However, each shard also has processing_keys and replication_tasks, and there’s a global discarded_replicas_ list.
Should we clear them as part of Reset (or before restore) as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reset function now includes handling for discarded_replicas_ and next_id_.

Since snapshots currently clean up dirty data by default, processing_keys and replication_tasks are not serialized and saved. If oplog features are to be supported in the future, these fields may require further processing.

return GetStorageConfigResponse(fsdir, enable_disk_eviction_, quota_bytes_);
}

auto MasterService::MountLocalDiskSegment(const UUID& client_id,
Copy link
Collaborator

@00fish0 00fish0 Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add shared_lock(SNAPSHOT_MUTEX) in this function?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied!Thanks for catching this.

return ss.str();
}

tl::expected<UUID, ErrorCode> MasterService::CreateCopyTask(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateCopyTask, CreateMoveTask, FetchTasks, MarkTaskToComplete, and TaskCleanupThreadFunc do not hold shared_lock(SNAPSHOT_MUTEX), but PersistState does serialize task_manager state. This means a snapshot fork can occur while these functions are modifying task state, leading to an inconsistent snapshot where task_manager and metadata are out of sync (e.g., a pending copy task referencing a replica that was already cleaned up during restore). Consider either adding shared_lock(SNAPSHOT_MUTEX) to these functions, or clearing non-finished tasks during restore.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied!Thanks for catching this. I added shared_lock(SNAPSHOT_MUTEX) to these functions.

Comment on lines +2493 to +2495
const bool skip_cleanup = std::getenv(
"MOONCAKE_MASTER_SERVICE_SNAPSHOT_TEST_SKIP_CLEANUP");
if (!skip_cleanup) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After restore we unmount non‑OK segments, which invalidates allocators, but metadata isn’t immediately scanned for stale memory replicas. Cleanup only happens later on write paths via CleanupStaleHandles.

Would it make sense to call CleanupStaleHandles here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CleanupStaleHandles is called inside the UnmountSegment function. Therefore, we do not need to call CleanupStaleHandles again.

@stmatengss
Copy link
Collaborator

@maheshrbapatu @maheshreddybapatu This PR is relevant to you. PTAL if you have interests. THX

@maheshrbapatu
Copy link
Collaborator

Thanks for adding me @stmatengss. I will take a look at this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants