[Store] Implement Metadata Persistence and Recovery for Master Service#1431
[Store] Implement Metadata Persistence and Recovery for Master Service#1431yangdao479 wants to merge 60 commits intomainfrom
Conversation
…dic persistence: - Add snapshot configuration options, including snapshot enablement, snapshot interval, timeout duration, etc. - Implement the snapshot thread function to periodically fork child processes for state persistence. - Add snapshot serialization logic to support serializing metadata and segment information to S3. - Introduce a file utility class that supports chunked writing of large files and automatic directory creation. - Add an S3 helper class to implement multipart upload and object deletion functionalities. - Add new snapshot-related metric monitoring, such as statistics on snapshot duration, success/failure counts. - Add support for cleaning up old snapshots, retaining a specified number of historical snapshots. - Modify the time retrieval method to uniformly use system_clock instead of steady_clock. - Add a snapshot recovery flag to control and support data recovery from snapshots. - Introduce the zstd compression library dependency and integrate it into the build system.
* [Store]: Abstract snapshot storage backend and implement local file backend * [Store]: fix bugs in master service snapshot and restore * [Store]: Add missing fields for snapshot/restore functionality - Support DISK and LOCAL_DISK replica types in snapshot/restore - Add allocator_names to preserve allocator ordering - Add client_local_disk_segment_ snapshot/restore support - Add client_by_name_ reconstruction on restore * [Store]: Add snapshot/restore tests for MasterService - Add test base class for snapshot/restore verification - Port tests from master_service_test.cpp for snapshot/restore verification - Port tests from master_service_ssd_test.cpp for SSD snapshot/restore verification - Sort unordered_map keys before serialization for deterministic snapshots * [Store]: translate comments into English
Summary of ChangesHello @yangdao479, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a snapshot and restore feature to the Mooncake Store's Master Service, enabling periodic backups of metadata and recovery from these snapshots to reduce restart times. It includes support for local and S3 storage backends, along with necessary configurations and testing. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces snapshot and restore functionality to the mooncake store, allowing for periodic snapshots of master metadata and recovery from these snapshots to reduce cache warm-up time after a master restart. The changes include adding new configuration flags, defining snapshot storage backend types (local or S3), implementing serialization and deserialization logic for metadata and segments, and creating a snapshot thread to periodically persist the master state. The code review highlights the importance of the snapshot mutex for data consistency, the security checks in the local file backend, and the use of a custom binary format within MessagePack for serialization, suggesting a more MessagePack-idiomatic approach for better maintainability. The review also points out the use of a non-thread-safe function for timestamp formatting and suggests using thread-safe alternatives.
| std::unique_lock<std::shared_mutex> lock(SNAPSHOT_MUTEX); | ||
| LOG(INFO) << "[Snapshot] Locking snapshot mutex, snapshot_id=" | ||
| << snapshot_id; | ||
| pid = fork(); | ||
| } |
There was a problem hiding this comment.
| const std::string& prefix) { | ||
| std::string full_path = KeyToPath(prefix); | ||
|
|
||
| try { | ||
| // Security check: ensure path to delete is within base_path_ | ||
| std::error_code ec; | ||
| fs::path canonical_base = fs::canonical(base_path_, ec); | ||
| if (ec) { | ||
| // base_path_ doesn't exist, try to create it and get canonical path | ||
| fs::create_directories(base_path_); | ||
| canonical_base = fs::canonical(base_path_, ec); | ||
| if (ec) { | ||
| return tl::make_unexpected( | ||
| fmt::format("Failed to resolve base path {}: {}", | ||
| base_path_, ec.message())); | ||
| } | ||
| } | ||
|
|
||
| // Check if target path exists | ||
| if (!fs::exists(full_path)) { | ||
| // Path doesn't exist, treat as successful deletion | ||
| return {}; | ||
| } | ||
|
|
||
| fs::path canonical_target = fs::canonical(full_path, ec); | ||
| if (ec) { | ||
| return tl::make_unexpected( | ||
| fmt::format("Failed to resolve target path {}: {}", full_path, | ||
| ec.message())); | ||
| } | ||
|
|
||
| // Verify target path is within base_path_ | ||
| std::string base_str = canonical_base.string(); | ||
| std::string target_str = canonical_target.string(); | ||
|
|
||
| // Ensure target path starts with base_path_ | ||
| if (target_str.length() < base_str.length() || | ||
| target_str.substr(0, base_str.length()) != base_str) { | ||
| LOG(ERROR) << "Security violation: Attempted to delete path " | ||
| "outside base directory. " | ||
| << "base_path=" << base_str | ||
| << ", target_path=" << target_str; | ||
| return tl::make_unexpected(fmt::format( | ||
| "Security error: Path {} is outside base directory {}", | ||
| full_path, base_path_)); | ||
| } | ||
|
|
||
| // Don't allow deleting base_path_ itself | ||
| if (target_str == base_str) { | ||
| LOG(ERROR) << "Security violation: Attempted to delete base " | ||
| "directory itself. " | ||
| << "base_path=" << base_str; | ||
| return tl::make_unexpected(fmt::format( | ||
| "Security error: Cannot delete base directory {}", base_path_)); | ||
| } | ||
|
|
||
| // Security check passed, execute delete operation | ||
| if (fs::is_directory(full_path)) { | ||
| auto removed_count = fs::remove_all(full_path, ec); | ||
| if (ec) { | ||
| return tl::make_unexpected( | ||
| fmt::format("Failed to remove directory {}: {}", full_path, | ||
| ec.message())); | ||
| } | ||
| VLOG(1) << "Removed directory: " << full_path | ||
| << ", items removed: " << removed_count; | ||
| } else { | ||
| // If it's a file, delete all files matching the prefix | ||
| fs::path parent_dir = fs::path(full_path).parent_path(); | ||
| std::string prefix_name = fs::path(full_path).filename().string(); | ||
|
|
||
| // Verify parent directory is also within base_path_ | ||
| fs::path canonical_parent = fs::canonical(parent_dir, ec); | ||
| if (ec || canonical_parent.string().substr(0, base_str.length()) != | ||
| base_str) { | ||
| return tl::make_unexpected(fmt::format( | ||
| "Security error: Parent path {} is outside base directory", | ||
| parent_dir.string())); | ||
| } | ||
|
|
||
| if (fs::exists(parent_dir) && fs::is_directory(parent_dir)) { | ||
| for (const auto& entry : fs::directory_iterator(parent_dir)) { | ||
| std::string filename = entry.path().filename().string(); | ||
| if (filename.find(prefix_name) == 0) { | ||
| fs::remove_all(entry.path(), ec); | ||
| if (ec) { | ||
| LOG(WARNING) << "Failed to remove: " << entry.path() | ||
| << ", error: " << ec.message(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return {}; | ||
| } catch (const fs::filesystem_error& e) { | ||
| return tl::make_unexpected( | ||
| fmt::format("Filesystem error during delete: {}", e.what())); | ||
| } |
There was a problem hiding this comment.
The LocalFileBackend::DeleteObjectsWithPrefix method includes robust security checks (fs::canonical, path prefix comparison) to prevent accidental or malicious deletion of files outside the designated base_path_ or the base_path_ itself. This is a critical security measure for file system operations.
| return tl::make_unexpected(ErrorCode::INVALID_PARAMS); | ||
| } | ||
|
|
||
| std::shared_lock<std::shared_mutex> shared_lock(SNAPSHOT_MUTEX); |
|
|
||
| tl::expected<void, ErrorCode> MasterService::MoveRevoke( | ||
| const UUID& client_id, const std::string& key) { | ||
| std::shared_lock<std::shared_mutex> shared_lock(SNAPSHOT_MUTEX); |
| -> tl::expected<void, ErrorCode> { | ||
| size_t metrics_dec_capacity = 0; // to update the metrics | ||
|
|
||
| std::shared_lock<std::shared_mutex> shared_lock(SNAPSHOT_MUTEX); |
| tl::expected<std::vector<uint8_t>, SerializationError> | ||
| SegmentSerializer::Serialize() { | ||
| if (!segment_manager_) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::SERIALIZE_FAIL, | ||
| "serialize SegmentManager segment_manager_ is null")); | ||
| } | ||
|
|
||
| if (segment_manager_->memory_allocator_ != BufferAllocatorType::OFFSET) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::SERIALIZE_UNSUPPORTED, | ||
| "serialize SegmentManager memory_allocator_ is not offset")); | ||
| } | ||
|
|
||
| msgpack::sbuffer sbuf; | ||
| msgpack::packer<msgpack::sbuffer> packer(&sbuf); | ||
|
|
||
| // Create map containing all data | ||
| packer.pack_map( | ||
| 5); // 5 fields: memory_allocator, mounted_segments, client_segments, | ||
| // allocator_names, local_disk_segments | ||
|
|
||
| // Serialize memory_allocator_ | ||
| packer.pack("ma"); | ||
| packer.pack(static_cast<int32_t>(segment_manager_->memory_allocator_)); | ||
|
|
||
| // Serialize allocator_manager_'s names_ order | ||
| packer.pack("an"); // allocator_names | ||
| const auto& names = segment_manager_->allocator_manager_.getNames(); | ||
| packer.pack_array(names.size()); | ||
| for (const auto& name : names) { | ||
| packer.pack(name); | ||
| } | ||
|
|
||
| // Serialize mounted_segments_ | ||
| // Sort UUIDs first to ensure deterministic serialization results | ||
| packer.pack("ms"); | ||
| packer.pack_map(segment_manager_->mounted_segments_.size()); | ||
|
|
||
| // Collect all segment UUIDs and sort | ||
| std::vector<UUID> sorted_segment_uuids; | ||
| sorted_segment_uuids.reserve(segment_manager_->mounted_segments_.size()); | ||
| for (const auto& pair : segment_manager_->mounted_segments_) { | ||
| sorted_segment_uuids.push_back(pair.first); | ||
| } | ||
| std::sort(sorted_segment_uuids.begin(), sorted_segment_uuids.end()); | ||
|
|
||
| for (const auto& segment_uuid : sorted_segment_uuids) { | ||
| const auto& mounted_segment = | ||
| segment_manager_->mounted_segments_.at(segment_uuid); | ||
| std::string uuid_str = UuidToString(segment_uuid); | ||
| packer.pack(uuid_str); | ||
|
|
||
| // Serialize MountedSegment object | ||
| auto result = | ||
| Serializer<MountedSegment>::serialize(mounted_segment, packer); | ||
| if (!result) { | ||
| return tl::unexpected(result.error()); | ||
| } | ||
| } | ||
|
|
||
| // Serialize client_segments_ | ||
| // Sort client UUIDs first to ensure deterministic serialization results | ||
| packer.pack("cs"); | ||
| packer.pack_map(segment_manager_->client_segments_.size()); | ||
|
|
||
| // Collect all client UUIDs and sort | ||
| std::vector<UUID> sorted_client_uuids; | ||
| sorted_client_uuids.reserve(segment_manager_->client_segments_.size()); | ||
| for (const auto& pair : segment_manager_->client_segments_) { | ||
| sorted_client_uuids.push_back(pair.first); | ||
| } | ||
| std::sort(sorted_client_uuids.begin(), sorted_client_uuids.end()); | ||
|
|
||
| for (const auto& client_uuid : sorted_client_uuids) { | ||
| const auto& segment_ids = | ||
| segment_manager_->client_segments_.at(client_uuid); | ||
| std::string client_uuid_str = UuidToString(client_uuid); | ||
| packer.pack(client_uuid_str); | ||
|
|
||
| // Serialize segment IDs array | ||
| packer.pack_array(segment_ids.size()); | ||
|
|
||
| for (const auto& segment_id : segment_ids) { | ||
| std::string segment_uuid_str = UuidToString(segment_id); | ||
| packer.pack(segment_uuid_str); | ||
| } | ||
| } | ||
|
|
||
| // Serialize client_local_disk_segment_ | ||
| // Sort client UUIDs first to ensure deterministic serialization results | ||
| packer.pack("ld"); // local_disk_segments | ||
| packer.pack_map(segment_manager_->client_local_disk_segment_.size()); | ||
|
|
||
| // Collect all client UUIDs and sort | ||
| std::vector<UUID> sorted_ld_uuids; | ||
| sorted_ld_uuids.reserve( | ||
| segment_manager_->client_local_disk_segment_.size()); | ||
| for (const auto& pair : segment_manager_->client_local_disk_segment_) { | ||
| sorted_ld_uuids.push_back(pair.first); | ||
| } | ||
| std::sort(sorted_ld_uuids.begin(), sorted_ld_uuids.end()); | ||
|
|
||
| for (const auto& client_uuid : sorted_ld_uuids) { | ||
| const auto& segment = | ||
| segment_manager_->client_local_disk_segment_.at(client_uuid); | ||
| packer.pack(UuidToString(client_uuid)); | ||
|
|
||
| // Serialize LocalDiskSegment: [enable_offloading, count, key1, ts1, | ||
| // key2, ts2, ...] Sort keys to ensure determinism | ||
| std::vector<std::string> sorted_keys; | ||
| for (const auto& [key, ts] : segment->offloading_objects) { | ||
| sorted_keys.push_back(key); | ||
| } | ||
| std::sort(sorted_keys.begin(), sorted_keys.end()); | ||
|
|
||
| packer.pack_array(2 + sorted_keys.size() * 2); | ||
| packer.pack(segment->enable_offloading); | ||
| packer.pack(static_cast<uint64_t>(sorted_keys.size())); | ||
|
|
||
| for (const auto& key : sorted_keys) { | ||
| packer.pack(key); | ||
| packer.pack(segment->offloading_objects.at(key)); | ||
| } | ||
| } | ||
|
|
||
| // Compress entire data | ||
| std::vector<uint8_t> compressed_data = zstd_compress( | ||
| reinterpret_cast<const uint8_t*>(sbuf.data()), sbuf.size(), 3); | ||
|
|
||
| // Return compressed data | ||
| return std::vector<uint8_t>( | ||
| std::make_move_iterator(compressed_data.begin()), | ||
| std::make_move_iterator(compressed_data.end())); |
There was a problem hiding this comment.
| tl::expected<void, SerializationError> SegmentSerializer::Deserialize( | ||
| const std::vector<uint8_t>& data) { | ||
| // Decompress data | ||
| std::vector<uint8_t> decompressed_data; | ||
| try { | ||
| decompressed_data = zstd_decompress( | ||
| reinterpret_cast<const uint8_t*>(data.data()), data.size()); | ||
| } catch (const std::exception& e) { | ||
| return tl::make_unexpected( | ||
| SerializationError(ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager failed to " | ||
| "decompress MessagePack data: " + | ||
| std::string(e.what()))); | ||
| } | ||
|
|
||
| // Parse MessagePack data | ||
| msgpack::object_handle oh; | ||
| try { | ||
| oh = msgpack::unpack( | ||
| reinterpret_cast<const char*>(decompressed_data.data()), | ||
| decompressed_data.size()); | ||
| } catch (const std::exception& e) { | ||
| return tl::make_unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager failed to unpack MessagePack data: " + | ||
| std::string(e.what()))); | ||
| } | ||
|
|
||
| const msgpack::object& obj = oh.get(); | ||
|
|
||
| // Check if it's a map | ||
| if (obj.type != msgpack::type::MAP) { | ||
| return tl::unexpected( | ||
| SerializationError(ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager invalid MessagePack " | ||
| "format: expected map")); | ||
| } | ||
|
|
||
| if (!segment_manager_) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager segment_manager is null")); | ||
| } | ||
|
|
||
| // Clear existing data | ||
| segment_manager_->mounted_segments_.clear(); | ||
| segment_manager_->client_segments_.clear(); | ||
|
|
||
| // Convert MessagePack map to regular map, use pointers for values to avoid | ||
| // copying | ||
| std::map<std::string, const msgpack::object*> fields_map; | ||
| for (uint32_t i = 0; i < obj.via.map.size; ++i) { | ||
| const msgpack::object& key_obj = obj.via.map.ptr[i].key; | ||
| const msgpack::object& value_obj = obj.via.map.ptr[i].val; | ||
|
|
||
| // Ensure key is a string | ||
| if (key_obj.type == msgpack::type::STR) { | ||
| std::string key(key_obj.via.str.ptr, key_obj.via.str.size); | ||
| fields_map.emplace(std::move(key), &value_obj); | ||
| } | ||
| } | ||
|
|
||
| // Process fields in order | ||
| // 1. First process memory_allocator_ | ||
| auto allocator_it = fields_map.find("ma"); | ||
| if (allocator_it != fields_map.end()) { | ||
| const msgpack::object* value_obj = allocator_it->second; | ||
| if (value_obj->type != msgpack::type::POSITIVE_INTEGER) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager memory_allocator is not int")); | ||
| } | ||
|
|
||
| auto allocatorType = | ||
| static_cast<BufferAllocatorType>(value_obj->as<int32_t>()); | ||
|
|
||
| // Note: Type must match, only OffsetAllocator is supported | ||
| if (allocatorType != segment_manager_->memory_allocator_) { | ||
| LOG(ERROR) << "deserialize memory allocator type doesn't match " | ||
| "current setting"; | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager memory allocator type doesn't " | ||
| "match current setting")); | ||
| } | ||
| } | ||
|
|
||
| // 1.5 Parse allocator_names (saved original names_ order) | ||
| std::vector<std::string> saved_allocator_names; | ||
| auto allocator_names_it = fields_map.find("an"); | ||
| if (allocator_names_it != fields_map.end()) { | ||
| const msgpack::object* value_obj = allocator_names_it->second; | ||
| if (value_obj->type != msgpack::type::ARRAY) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager allocator_names is not array")); | ||
| } | ||
|
|
||
| saved_allocator_names.reserve(value_obj->via.array.size); | ||
| for (uint32_t i = 0; i < value_obj->via.array.size; ++i) { | ||
| const msgpack::object& name_obj = value_obj->via.array.ptr[i]; | ||
| if (name_obj.type != msgpack::type::STR) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager allocator_name is not string")); | ||
| } | ||
| saved_allocator_names.emplace_back(name_obj.via.str.ptr, | ||
| name_obj.via.str.size); | ||
| } | ||
| } | ||
|
|
||
| // 2. Process mounted_segments_ | ||
| auto mounted_segments_it = fields_map.find("ms"); | ||
| if (mounted_segments_it != fields_map.end()) { | ||
| const msgpack::object* value_obj = mounted_segments_it->second; | ||
| if (value_obj->type != msgpack::type::MAP) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager mounted_segments is not map")); | ||
| } | ||
|
|
||
| // Iterate through all mounted segments | ||
| for (uint32_t j = 0; j < value_obj->via.map.size; ++j) { | ||
| const msgpack::object& segment_key = value_obj->via.map.ptr[j].key; | ||
| const msgpack::object& segment_value = | ||
| value_obj->via.map.ptr[j].val; | ||
|
|
||
| // Ensure key is a string (UUID) | ||
| if (segment_key.type != msgpack::type::STR) { | ||
| return tl::unexpected( | ||
| SerializationError(ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager " | ||
| "mounted_segments key is not string")); | ||
| } | ||
|
|
||
| std::string uuid_str(segment_key.via.str.ptr, | ||
| segment_key.via.str.size); | ||
| UUID segment_uuid; | ||
| if (!StringToUuid(uuid_str, segment_uuid)) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format("deserialize SegmentManager mounted_segments " | ||
| "uuid {} is invalid", | ||
| uuid_str))); | ||
| } | ||
|
|
||
| // Deserialize MountedSegment object | ||
| auto result = | ||
| Serializer<MountedSegment>::deserialize(segment_value); | ||
| if (!result) { | ||
| return tl::unexpected(result.error()); | ||
| } | ||
|
|
||
| segment_manager_->mounted_segments_.emplace( | ||
| segment_uuid, std::move(result.value())); | ||
| } | ||
| } | ||
|
|
||
| // 3. Process client_segments_ | ||
| auto client_segments_it = fields_map.find("cs"); | ||
| if (client_segments_it != fields_map.end()) { | ||
| const msgpack::object* value_obj = client_segments_it->second; | ||
| if (value_obj->type != msgpack::type::MAP) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager client_segments is not map")); | ||
| } | ||
|
|
||
| // Iterate through all client segments | ||
| for (uint32_t j = 0; j < value_obj->via.map.size; ++j) { | ||
| const msgpack::object& client_key = value_obj->via.map.ptr[j].key; | ||
| const msgpack::object& client_value = value_obj->via.map.ptr[j].val; | ||
|
|
||
| // Ensure key is a string (UUID) | ||
| if (client_key.type != msgpack::type::STR) { | ||
| return tl::unexpected( | ||
| SerializationError(ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager " | ||
| "client_segments key is not string")); | ||
| } | ||
|
|
||
| std::string client_uuid_str(client_key.via.str.ptr, | ||
| client_key.via.str.size); | ||
| UUID client_uuid; | ||
| if (!StringToUuid(client_uuid_str, client_uuid)) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format("deserialize SegmentManager client_segments " | ||
| "client uuid {} is invalid", | ||
| client_uuid_str))); | ||
| } | ||
|
|
||
| // Ensure value is an array | ||
| if (client_value.type != msgpack::type::ARRAY) { | ||
| return tl::unexpected( | ||
| SerializationError(ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager " | ||
| "client_segments value is not array")); | ||
| } | ||
|
|
||
| // Deserialize segment IDs array | ||
| std::vector<UUID> segment_ids; | ||
| segment_ids.reserve(client_value.via.array.size); | ||
|
|
||
| for (uint32_t k = 0; k < client_value.via.array.size; ++k) { | ||
| const msgpack::object& segment_id_obj = | ||
| client_value.via.array.ptr[k]; | ||
|
|
||
| if (segment_id_obj.type != msgpack::type::STR) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager segment_id is not string")); | ||
| } | ||
|
|
||
| std::string segment_uuid_str(segment_id_obj.via.str.ptr, | ||
| segment_id_obj.via.str.size); | ||
| UUID segment_uuid; | ||
| if (!StringToUuid(segment_uuid_str, segment_uuid)) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format("deserialize SegmentManager segment_id {} " | ||
| "is invalid", | ||
| segment_uuid_str))); | ||
| } | ||
|
|
||
| segment_ids.push_back(segment_uuid); | ||
| } | ||
|
|
||
| segment_manager_->client_segments_.emplace(client_uuid, | ||
| std::move(segment_ids)); | ||
| } | ||
| } | ||
|
|
||
| // Restore allocator_manager_ based on mounted_segments_ and saved names | ||
| // order | ||
| segment_manager_->allocator_manager_ = AllocatorManager(); | ||
|
|
||
| // Add allocators in saved original order | ||
| for (const auto& name : saved_allocator_names) { | ||
| for (auto& pair : segment_manager_->mounted_segments_) { | ||
| MountedSegment& mounted_segment = pair.second; | ||
| if (mounted_segment.segment.name == name && | ||
| mounted_segment.status == SegmentStatus::OK && | ||
| mounted_segment.buf_allocator) { | ||
| segment_manager_->allocator_manager_.addAllocator( | ||
| name, mounted_segment.buf_allocator); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Rebuild client_by_name_ based on client_segments_ and mounted_segments_ | ||
| segment_manager_->client_by_name_.clear(); | ||
| for (const auto& [client_id, segment_ids] : | ||
| segment_manager_->client_segments_) { | ||
| for (const auto& segment_id : segment_ids) { | ||
| auto it = segment_manager_->mounted_segments_.find(segment_id); | ||
| if (it != segment_manager_->mounted_segments_.end()) { | ||
| segment_manager_->client_by_name_[it->second.segment.name] = | ||
| client_id; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // 4. Process client_local_disk_segment_ | ||
| segment_manager_->client_local_disk_segment_.clear(); | ||
| auto ld_it = fields_map.find("ld"); | ||
| if (ld_it != fields_map.end()) { | ||
| const msgpack::object* ld_obj = ld_it->second; | ||
| if (ld_obj->type != msgpack::type::MAP) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize SegmentManager local_disk_segments is not map")); | ||
| } | ||
|
|
||
| for (uint32_t j = 0; j < ld_obj->via.map.size; ++j) { | ||
| const msgpack::object& client_key = ld_obj->via.map.ptr[j].key; | ||
| const msgpack::object& client_value = ld_obj->via.map.ptr[j].val; | ||
|
|
||
| // Parse client_id | ||
| if (client_key.type != msgpack::type::STR) { | ||
| return tl::unexpected( | ||
| SerializationError(ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize local_disk_segments " | ||
| "client key is not string")); | ||
| } | ||
|
|
||
| std::string client_uuid_str(client_key.via.str.ptr, | ||
| client_key.via.str.size); | ||
| UUID client_id; | ||
| if (!StringToUuid(client_uuid_str, client_id)) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format("deserialize local_disk_segments " | ||
| "client uuid {} is invalid", | ||
| client_uuid_str))); | ||
| } | ||
|
|
||
| // Parse LocalDiskSegment array: [enable_offloading, count, key1, | ||
| // ts1, ...] | ||
| if (client_value.type != msgpack::type::ARRAY || | ||
| client_value.via.array.size < 2) { | ||
| return tl::unexpected( | ||
| SerializationError(ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize local_disk_segments " | ||
| "value is not valid array")); | ||
| } | ||
|
|
||
| bool enable_offloading = client_value.via.array.ptr[0].as<bool>(); | ||
| uint64_t count = client_value.via.array.ptr[1].as<uint64_t>(); | ||
|
|
||
| auto segment = | ||
| std::make_shared<LocalDiskSegment>(enable_offloading); | ||
|
|
||
| // Parse offloading_objects | ||
| for (uint64_t k = 0; k < count; ++k) { | ||
| size_t key_idx = 2 + k * 2; | ||
| size_t ts_idx = 2 + k * 2 + 1; | ||
| if (ts_idx >= client_value.via.array.size) { | ||
| return tl::unexpected( | ||
| SerializationError(ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize local_disk_segments " | ||
| "offloading_objects out of bounds")); | ||
| } | ||
|
|
||
| std::string key( | ||
| client_value.via.array.ptr[key_idx].via.str.ptr, | ||
| client_value.via.array.ptr[key_idx].via.str.size); | ||
| int64_t ts = client_value.via.array.ptr[ts_idx].as<int64_t>(); | ||
| segment->offloading_objects[key] = ts; | ||
| } | ||
|
|
||
| segment_manager_->client_local_disk_segment_[client_id] = | ||
| std::move(segment); | ||
| } | ||
| } | ||
|
|
||
| return {}; |
There was a problem hiding this comment.
The SegmentSerializer::Deserialize method correctly handles decompression and MessagePack parsing. The reconstruction of allocator_manager_ and client_by_name_ based on the deserialized mounted_segments_ and client_segments_ is complex but necessary. The validation checks for MessagePack types and array sizes are good for robustness.
| tl::expected<void, SerializationError> | ||
| Serializer<offset_allocator::__Allocator>::serialize( | ||
| const offset_allocator::__Allocator &allocator, MsgpackPacker &packer) { | ||
| // Use array instead of map for more compact storage | ||
| // Array order is consistent with deserialize_msgpack | ||
| packer.pack_array(10); | ||
|
|
||
| // Basic properties (packed in order) | ||
| packer.pack(allocator.m_size); | ||
| packer.pack(allocator.m_current_capacity); | ||
| packer.pack(allocator.m_max_capacity); | ||
| packer.pack(allocator.m_freeStorage); | ||
| packer.pack(allocator.m_usedBinsTop); | ||
|
|
||
| // usedBins array | ||
| packer.pack_array(offset_allocator::NUM_TOP_BINS); | ||
| for (unsigned char m_usedBin : allocator.m_usedBins) { | ||
| packer.pack(m_usedBin); | ||
| } | ||
|
|
||
| // binIndex array | ||
| packer.pack_array(offset_allocator::NUM_LEAF_BINS); | ||
| for (unsigned int m_binIndex : allocator.m_binIndices) { | ||
| packer.pack(m_binIndex); | ||
| } | ||
|
|
||
| // nodes data serialization and compression | ||
| std::vector<uint8_t> serialized_nodes; | ||
| serialized_nodes.reserve(allocator.m_max_capacity * 25); | ||
|
|
||
| for (uint32_t i = 0; i < allocator.m_current_capacity; i++) { | ||
| const auto &node = allocator.m_nodes[i]; | ||
| serialized_nodes.push_back(node.used ? 1 : 0); | ||
| SerializationHelper::serializeUint32(node.dataOffset, serialized_nodes); | ||
| SerializationHelper::serializeUint32(node.dataSize, serialized_nodes); | ||
| SerializationHelper::serializeUint32(node.binListPrev, | ||
| serialized_nodes); | ||
| SerializationHelper::serializeUint32(node.binListNext, | ||
| serialized_nodes); | ||
| SerializationHelper::serializeUint32(node.neighborPrev, | ||
| serialized_nodes); | ||
| SerializationHelper::serializeUint32(node.neighborNext, | ||
| serialized_nodes); | ||
| } | ||
|
|
||
| try { | ||
| std::vector<uint8_t> compressed_nodes = | ||
| zstd_compress(serialized_nodes, 3); | ||
| packer.pack(compressed_nodes); | ||
|
|
||
| // freeNodes data serialization and compression | ||
| std::vector<uint8_t> serialized_free_nodes; | ||
| serialized_free_nodes.reserve(allocator.m_current_capacity * 4); | ||
|
|
||
| for (uint32_t i = 0; i < allocator.m_current_capacity; i++) { | ||
| SerializationHelper::serializeUint32(allocator.m_freeNodes[i], | ||
| serialized_free_nodes); | ||
| } | ||
|
|
||
| std::vector<uint8_t> compressed_free_nodes = | ||
| zstd_compress(serialized_free_nodes, 3); | ||
| packer.pack(compressed_free_nodes); | ||
| } catch (const std::exception &e) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::SERIALIZE_FAIL, | ||
| std::string( | ||
| "offset_allocator::__Allocator, error compressing nodes: ") + | ||
| e.what())); | ||
| } | ||
|
|
||
| // freeOffset | ||
| packer.pack(allocator.m_freeOffset); | ||
|
|
||
| return {}; |
There was a problem hiding this comment.
The Serializer<offset_allocator::__Allocator>::serialize method uses a custom binary format within MessagePack by manually serializing Node and NodeIndex fields using SerializationHelper::serializeUint32 and then compressing the raw byte vectors with ZSTD. While this might be efficient, it makes the serialization format tightly coupled to the Node struct's internal layout and the SerializationHelper's implementation. This approach is more brittle to changes and less readable than letting MessagePack handle more structured data directly (e.g., packing Node objects as MessagePack arrays or maps).
| tl::expected<std::unique_ptr<offset_allocator::__Allocator>, SerializationError> | ||
| Serializer<offset_allocator::__Allocator>::deserialize( | ||
| const msgpack::object &obj) { | ||
| // Check if object type is array (consistent with serialize_msgpack) | ||
| if (obj.type != msgpack::type::ARRAY) { | ||
| return tl::unexpected( | ||
| SerializationError(ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize offset_allocator::__Allocator " | ||
| "invalid msgpack data, expected array")); | ||
| } | ||
|
|
||
| // Verify array size is correct | ||
| if (obj.via.array.size != 10) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format("deserialize offset_allocator::__Allocator invalid " | ||
| "array size: expected 9, got {}", | ||
| obj.via.array.size))); | ||
| } | ||
|
|
||
| auto *array_items = obj.via.array.ptr; | ||
| size_t index = 0; | ||
|
|
||
| // Deserialize basic properties | ||
| uint32_t size = array_items[index++].as<uint32_t>(); | ||
| uint32_t current_capacity = array_items[index++].as<uint32_t>(); | ||
| uint32_t max_capacity = array_items[index++].as<uint32_t>(); | ||
|
|
||
| // Create allocator object | ||
| auto allocator = std::make_unique<offset_allocator::__Allocator>( | ||
| size, current_capacity, max_capacity); | ||
|
|
||
| allocator->m_freeStorage = array_items[index++].as<uint32_t>(); | ||
| allocator->m_usedBinsTop = array_items[index++].as<uint32_t>(); | ||
|
|
||
| // Deserialize usedBins array | ||
| const auto &used_bins_array = array_items[index++]; | ||
| if (used_bins_array.type != msgpack::type::ARRAY) { | ||
| return tl::unexpected( | ||
| SerializationError(ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize offset_allocator::__Allocator " | ||
| "usedBins is not an array")); | ||
| } | ||
|
|
||
| if (used_bins_array.via.array.size != offset_allocator::NUM_TOP_BINS) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format("deserialize offset_allocator::__Allocator usedBins " | ||
| "invalid size: expected {}, got {}", | ||
| offset_allocator::NUM_TOP_BINS, | ||
| used_bins_array.via.array.size))); | ||
| } | ||
|
|
||
| for (uint32_t i = 0; i < used_bins_array.via.array.size && | ||
| i < offset_allocator::NUM_TOP_BINS; | ||
| i++) { | ||
| allocator->m_usedBins[i] = | ||
| used_bins_array.via.array.ptr[i].as<uint8_t>(); | ||
| } | ||
|
|
||
| // Deserialize binIndices array | ||
| const auto &bin_indices_array = array_items[index++]; | ||
| if (bin_indices_array.type != msgpack::type::ARRAY) { | ||
| return tl::unexpected( | ||
| SerializationError(ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize offset_allocator::__Allocator " | ||
| "binIndices is not an array")); | ||
| } | ||
|
|
||
| if (bin_indices_array.via.array.size != offset_allocator::NUM_LEAF_BINS) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format("deserialize offset_allocator::__Allocator binIndices " | ||
| "invalid size: expected {}, got {}", | ||
| offset_allocator::NUM_LEAF_BINS, | ||
| bin_indices_array.via.array.size))); | ||
| } | ||
|
|
||
| for (uint32_t i = 0; i < bin_indices_array.via.array.size && | ||
| i < offset_allocator::NUM_LEAF_BINS; | ||
| i++) { | ||
| allocator->m_binIndices[i] = | ||
| bin_indices_array.via.array.ptr[i].as<uint32_t>(); | ||
| } | ||
|
|
||
| try { | ||
| // Deserialize compressed nodes data | ||
| const auto &nodes_bin = array_items[index++]; | ||
| if (nodes_bin.type != msgpack::type::BIN) { | ||
| return tl::unexpected( | ||
| SerializationError(ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize offset_allocator::__Allocator " | ||
| "nodes data is not binary")); | ||
| } | ||
|
|
||
| // Create copy of compressed data | ||
| std::vector<uint8_t> compressed_data( | ||
| reinterpret_cast<const uint8_t *>(nodes_bin.via.bin.ptr), | ||
| reinterpret_cast<const uint8_t *>(nodes_bin.via.bin.ptr) + | ||
| nodes_bin.via.bin.size); | ||
|
|
||
| // Decompress data | ||
| std::vector<uint8_t> serialized_nodes = | ||
| zstd_decompress(compressed_data); | ||
|
|
||
| // Verify decompressed data size is reasonable | ||
| if (serialized_nodes.size() % 25 != 0) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format("deserialize offset_allocator::__Allocator invalid " | ||
| "serialized nodes data size: expected multiple of " | ||
| "25, actual size: {}", | ||
| serialized_nodes.size()))); | ||
| } | ||
|
|
||
| if (serialized_nodes.size() / 25 != current_capacity) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format("deserialize offset_allocator::__Allocator invalid " | ||
| "serialized nodes data size: expected quotient of " | ||
| "{}, actual quotient: {}", | ||
| current_capacity, serialized_nodes.size() / 25))); | ||
| } | ||
|
|
||
| // Deserialize nodes array in standardized format | ||
| size_t offset = 0; | ||
| for (uint32_t i = 0; i < current_capacity; i++) { | ||
| if (offset + 25 > serialized_nodes.size()) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format("deserialize offset_allocator::__Allocator " | ||
| "incomplete serialized nodes data at index {}", | ||
| i))); | ||
| } | ||
|
|
||
| // Deserialize bool field | ||
| allocator->m_nodes[i].used = (serialized_nodes[offset++] != 0); | ||
|
|
||
| // Deserialize uint32_t field | ||
| allocator->m_nodes[i].dataOffset = | ||
| SerializationHelper::deserializeUint32( | ||
| &serialized_nodes[offset]); | ||
| offset += 4; | ||
| allocator->m_nodes[i].dataSize = | ||
| SerializationHelper::deserializeUint32( | ||
| &serialized_nodes[offset]); | ||
| offset += 4; | ||
| allocator->m_nodes[i].binListPrev = | ||
| SerializationHelper::deserializeUint32( | ||
| &serialized_nodes[offset]); | ||
| offset += 4; | ||
| allocator->m_nodes[i].binListNext = | ||
| SerializationHelper::deserializeUint32( | ||
| &serialized_nodes[offset]); | ||
| offset += 4; | ||
| allocator->m_nodes[i].neighborPrev = | ||
| SerializationHelper::deserializeUint32( | ||
| &serialized_nodes[offset]); | ||
| offset += 4; | ||
| allocator->m_nodes[i].neighborNext = | ||
| SerializationHelper::deserializeUint32( | ||
| &serialized_nodes[offset]); | ||
| offset += 4; | ||
| } | ||
| } catch (const std::exception &e) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format("deserialize offset_allocator::__Allocator error " | ||
| "decompressing nodes: {}", | ||
| e.what()))); | ||
| } | ||
|
|
||
| try { | ||
| // Deserialize compressed freeNodes data | ||
| const auto &free_nodes_bin = array_items[index++]; | ||
| if (free_nodes_bin.type != msgpack::type::BIN) { | ||
| return tl::unexpected( | ||
| SerializationError(ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize offset_allocator::__Allocator " | ||
| "freeNodes data is not binary")); | ||
| } | ||
|
|
||
| // Create copy of compressed data | ||
| std::vector<uint8_t> compressed_data( | ||
| reinterpret_cast<const uint8_t *>(free_nodes_bin.via.bin.ptr), | ||
| reinterpret_cast<const uint8_t *>(free_nodes_bin.via.bin.ptr) + | ||
| free_nodes_bin.via.bin.size); | ||
|
|
||
| // Decompress data | ||
| std::vector<uint8_t> serialized_free_nodes = | ||
| zstd_decompress(compressed_data); | ||
|
|
||
| // Verify decompressed data size is reasonable | ||
| if (serialized_free_nodes.size() != current_capacity * 4) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format( | ||
| "deserialize offset_allocator::__Allocator invalid " | ||
| "serialized free nodes data size: expected {}, actual {}", | ||
| current_capacity * 4, serialized_free_nodes.size()))); | ||
| } | ||
|
|
||
| // Deserialize freeNodes array in standardized format | ||
| size_t offset = 0; | ||
| for (uint32_t i = 0; i < current_capacity; i++) { | ||
| if (offset + 4 > serialized_free_nodes.size()) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format( | ||
| "deserialize offset_allocator::__Allocator incomplete " | ||
| "serialized free nodes data at index {}", | ||
| i))); | ||
| } | ||
| allocator->m_freeNodes[i] = SerializationHelper::deserializeUint32( | ||
| &serialized_free_nodes[offset]); | ||
| offset += 4; | ||
| } | ||
| } catch (const std::exception &e) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format("deserialize offset_allocator::__Allocator error " | ||
| "processing free nodes: {}", | ||
| e.what()))); | ||
| } | ||
|
|
||
| // Deserialize freeOffset | ||
| allocator->m_freeOffset = array_items[index++].as<uint32_t>(); | ||
|
|
||
| return allocator; |
There was a problem hiding this comment.
The Serializer<offset_allocator::__Allocator>::deserialize method manually deserializes the custom binary format for Node and NodeIndex fields after ZSTD decompression. This requires strict adherence to the serialization order and byte-level interpretation. Any mismatch between serialization and deserialization logic could lead to data corruption. Consider adding a version field to the serialized data to handle schema evolution more gracefully.
mooncake-store/include/types.h
Outdated
|
|
||
| using UUID = std::pair<uint64_t, uint64_t>; | ||
|
|
||
| inline std::shared_mutex SNAPSHOT_MUTEX; // Snapshot mutex |
There was a problem hiding this comment.
The introduction of a global SNAPSHOT_MUTEX is a significant design decision. While it's used to protect the state during snapshot operations, global mutable state can increase complexity in large codebases and make reasoning about thread safety more challenging. Consider encapsulating this mutex within the MasterService class or a dedicated snapshot manager if possible, to limit its scope and improve modularity.
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive snapshot and restore mechanism for the Master Service, a critical feature for reducing cache warm-up time and improving service availability. However, several critical security and stability issues have been identified. These include a potential deadlock in the snapshot child process, a path traversal vulnerability in the local file storage backend, and potential Denial of Service via unbounded memory allocation during decompression and S3 downloads, primarily stemming from the use of fork() in a multi-threaded environment and insufficient data validation. Additionally, a potential issue with a global mutex for snapshotting, memory optimization for large S3 uploads, and minor inconsistencies in the serialization code were noted.
| std::unique_lock<std::shared_mutex> lock(SNAPSHOT_MUTEX); | ||
| LOG(INFO) << "[Snapshot] Locking snapshot mutex, snapshot_id=" | ||
| << snapshot_id; | ||
| pid = fork(); |
There was a problem hiding this comment.
The fork() call here is performed while holding an exclusive lock on SNAPSHOT_MUTEX. While this ensures consistency for metadata protected by this mutex, the child process subsequently attempts to serialize the TaskManager state in PersistState, which involves acquiring task_manager_.mutex_. Several methods in MasterService (such as FetchTasks, MarkTaskToComplete, and QueryTask) and the TaskCleanupThreadFunc acquire task_manager_.mutex_ without holding SNAPSHOT_MUTEX. If any of these threads hold the lock during fork(), the child process will deadlock when it attempts to acquire it during serialization.
To fix this, ensure that all code paths acquiring task_manager_.mutex_ also acquire a shared lock on SNAPSHOT_MUTEX to prevent forking while the lock is held.
There was a problem hiding this comment.
To implement the fix with minimal impact, should we apply std::shared_lockstd::shared_mutex at the entries of FetchTasks, MarkTaskToComplete, QueryTask, and TaskCleanupThreadFunc? This ensures that fork() (holding the unique lock) waits for all task_manager_.mutex_ holders to finish, ensuring a clean state in the child process."
There was a problem hiding this comment.
QueryTask doesn't need a snapshot lock because it's a read-only function. Locking of SNAPSHOT_MUTEX has been added elsewhere.
| std::string LocalFileBackend::KeyToPath(const std::string& key) const { | ||
| // key format: "master_snapshot/20231201_123456_000/metadata" | ||
| // converts to: "/base_path/master_snapshot/20231201_123456_000/metadata" | ||
| return base_path_ + "/" + key; |
There was a problem hiding this comment.
The LocalFileBackend implementation lacks path traversal protection in its UploadBuffer, DownloadBuffer, UploadString, and DownloadString methods. The KeyToPath method simply concatenates the base_path_ with the provided key. Since MasterService::RestoreState uses the content of latest.txt from the backend as a state_id to construct keys, a compromised backend could lead to arbitrary file read/write on the host system by injecting .. sequences or null bytes into latest.txt.
Implement strict validation for all keys and ensure the resolved canonical path remains within base_path_, similar to the check already present in DeleteObjectsWithPrefix.
mooncake-store/include/types.h
Outdated
|
|
||
| using UUID = std::pair<uint64_t, uint64_t>; | ||
|
|
||
| inline std::shared_mutex SNAPSHOT_MUTEX; // Snapshot mutex |
There was a problem hiding this comment.
The SNAPSHOT_MUTEX is defined as a global inline variable. This can lead to contention and incorrect behavior if multiple MasterService instances are created within the same process (e.g., in tests), as they would all share the same lock. To improve encapsulation and prevent potential race conditions, this mutex should be a member of the MasterService class.
To fix this, you can add a mutable std::shared_mutex snapshot_mutex_; member to MasterService and replace all uses of the global SNAPSHOT_MUTEX with this new member.
There was a problem hiding this comment.
+1 to Gemini's suggestion. Though running multiple MasterService instances in one process is rare in production, making SNAPSHOT_MUTEX a member adds negligible overhead (just one std::shared_mutex per instance.
There was a problem hiding this comment.
Applied!Thanks for catching this.
| auto decompressed_data = zstd_decompress( | ||
| reinterpret_cast<const uint8_t*>(shard_data_obj.via.bin.ptr), | ||
| shard_data_obj.via.bin.size); |
There was a problem hiding this comment.
The MetadataSerializer::Deserialize method calls zstd_decompress without specifying a max_decompressed_size. This allows a malicious snapshot to specify an extremely large decompressed size in the zstd header, causing the Master Service to attempt a massive memory allocation and crash (OOM). A similar issue exists in SegmentSerializer::Deserialize in mooncake-store/src/segment.cpp.
Always use the zstd_decompress overload that accepts a max_decompressed_size parameter and enforce a reasonable limit (e.g., 1GB).
| fmt::format("Invalid content length received: {}", size)); | ||
| } | ||
|
|
||
| buffer.resize(static_cast<size_t>(size)); |
There was a problem hiding this comment.
The S3Helper::DownloadBuffer and DownloadBufferMultipart methods resize the output buffer based on the Content-Length from S3 without any validation. A malicious or compromised S3 backend could return an extremely large Content-Length, leading to an OOM crash during the resize() call.
Implement a maximum allowed size for S3 downloads and verify the Content-Length against this limit before allocating memory.
| // Use non-blocking polling to wait | ||
| while (true) { | ||
| // Read child logs first | ||
| flush_child_logs(); | ||
|
|
||
| int status; | ||
| pid_t result = waitpid(pid, &status, WNOHANG); | ||
|
|
||
| if (result == -1) { | ||
| LOG(ERROR) << "[Snapshot] Failed to wait for child process: " | ||
| << strerror(errno) << ", snapshot_id=" << snapshot_id | ||
| << ", child_pid=" << pid; | ||
| MasterMetricManager::instance().inc_snapshot_fail(); | ||
| return; | ||
| } else if (result == 0) { | ||
| // Child process is still running | ||
| auto elapsed = std::chrono::duration_cast<std::chrono::seconds>( | ||
| std::chrono::steady_clock::now() - start_time) | ||
| .count(); | ||
|
|
||
| if (elapsed >= timeout_seconds) { | ||
| // Timeout handling - flush remaining logs before killing | ||
| flush_child_logs(); | ||
| if (!log_buffer.empty()) { | ||
| LOG(INFO) << "[Snapshot:Child] " << log_buffer; | ||
| } | ||
| HandleChildTimeout(pid, snapshot_id); | ||
| MasterMetricManager::instance().inc_snapshot_fail(); | ||
| return; | ||
| } | ||
|
|
||
| // Brief sleep before checking again | ||
| std::this_thread::sleep_for(std::chrono::seconds(2)); | ||
| } else { | ||
| // Child process has exited | ||
| // Flush remaining logs from child | ||
| flush_child_logs(); | ||
| // Output any remaining incomplete line | ||
| if (!log_buffer.empty()) { | ||
| LOG(INFO) << "[Snapshot:Child] " << log_buffer; | ||
| } | ||
|
|
||
| HandleChildExit(pid, status, snapshot_id); | ||
| auto elapsed = | ||
| std::chrono::duration_cast<std::chrono::milliseconds>( | ||
| std::chrono::steady_clock::now() - start_time) | ||
| .count(); | ||
| MasterMetricManager::instance().set_snapshot_duration_ms(elapsed); | ||
| return; | ||
| } | ||
| } |
There was a problem hiding this comment.
The waitpid with WNOHANG in a loop creates a busy-wait pattern with a fixed 2-second sleep. This is inefficient and can introduce delays in detecting child process termination. A more efficient approach would be to use select() or poll() to wait on both the pipe file descriptor for logs and the child process status. However, a simpler improvement is to use waitpid in a blocking manner on a separate thread, which would be more responsive and resource-friendly than polling.
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format("deserialize offset_allocator::__Allocator invalid " | ||
| "array size: expected 9, got {}", | ||
| obj.via.array.size))); |
There was a problem hiding this comment.
There's a minor typo in the error message. The check is for an array size of 10, but the message says "expected 9". This should be corrected to avoid confusion during debugging.
fmt::format("deserialize offset_allocator::__Allocator invalid "
"array size: expected 10, got {}",
obj.via.array.size)));| if (obj.via.array.size < 8) { | ||
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| "deserialize MountedSegment invalid array size")); | ||
| } |
There was a problem hiding this comment.
The array size check obj.via.array.size < 8 is not strict enough. The serialization logic in Serializer<MountedSegment>::serialize always packs exactly 8 elements. For better data validation, it's recommended to check for exact equality using obj.via.array.size != 8.
if (obj.via.array.size != 8) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
"deserialize MountedSegment invalid array size"));
}| // Use temporary stream | ||
| auto stream = | ||
| Aws::MakeShared<Aws::StringStream>("UploadPart"); | ||
| stream->write(reinterpret_cast<const char *>(part_data), | ||
| current_part_size); | ||
| part_request.SetBody(stream); |
There was a problem hiding this comment.
The current implementation of UploadBufferMultipart creates an Aws::StringStream for each part, which involves copying the part data into a new in-memory buffer before uploading. For large files, this can double the memory consumption during the upload process.
To optimize this, you could implement a custom Aws::IOStream that reads directly from the source buffer slice (part_data) without making an intermediate copy. This can be achieved by creating a custom std::streambuf that wraps the existing memory buffer.
|
00fish0
left a comment
There was a problem hiding this comment.
Thanks for your contribution! I've left some comments below.
|
|
||
| // nodes data serialization and compression | ||
| std::vector<uint8_t> serialized_nodes; | ||
| serialized_nodes.reserve(allocator.m_max_capacity * 25); |
There was a problem hiding this comment.
25 is a magic number here. Maybe 1 + 6 * sizeof(uint32_t) is better for clarity.
There was a problem hiding this comment.
Fixed! Thanks for catching this.
| // 2. Download manifest.txt to parse protocol version info | ||
| std::string manifest_path = path_prefix + SNAPSHOT_MANIFEST_FILE; | ||
| std::string manifest_content; | ||
| if (!snapshot_backend_->DownloadString(manifest_path, | ||
| manifest_content)) { | ||
| LOG(ERROR) << "[Restore] Failed to download manifest file: " | ||
| << manifest_path << " , starting fresh"; | ||
| return; | ||
| } |
There was a problem hiding this comment.
Manifest parsing only logs serializer_type and version but doesn’t enforce compatibility. If field order/size changes, old snapshots may deserialize into corrupted state with unclear errors.
Could we add a strict check and fail fast on mismatch?
There was a problem hiding this comment.
Good point!Currently, the serializer_type and version are validated at the start of the restore process.
|
|
||
| // Decompress data | ||
| std::vector<uint8_t> serialized_nodes = | ||
| zstd_decompress(compressed_data); |
There was a problem hiding this comment.
Several deserialization paths in this file call zstd_decompress without a max output size.
There’s already a size-capped overload in zstd_util.h; can we use it here and define reasonable max sizes?
| return tl::unexpected(SerializationError( | ||
| ErrorCode::DESERIALIZE_FAIL, | ||
| fmt::format("deserialize offset_allocator::__Allocator invalid " | ||
| "array size: expected 9, got {}", |
There was a problem hiding this comment.
Minor: The error message says 'expected 9'. Could we update the error message?
There was a problem hiding this comment.
Fixed! Thanks for catching this.
| "content={}", | ||
| latest_path, snapshot_id, latest_content); | ||
|
|
||
| CleanupOldSnapshot(10, snapshot_id); |
There was a problem hiding this comment.
I have some thoughts regarding the snapshot retention policy. While keeping 10 snapshots is likely too many and will put significant pressure on etcd's storage space, I'm concerned that keeping only one might be risky due to:
Partial Writes: If a crash occurs during the snapshotting process, we might end up with a corrupted file.
Oplog Desynchronization: If the oplog isn't perfectly synced with the snapshot, we might need to replay from a previous, stable state.
What is the optimal number here? Perhaps 2 or 3 would be a better balance between safety and etcd's capacity constraints?
There was a problem hiding this comment.
A new configuration option has been added: snapshot_retention_count, which sets the number of old snapshots to retain (default is 2).
| // Child process | ||
| // Close read end, set write end for logging | ||
| close(log_pipe[0]); | ||
| g_snapshot_log_pipe_fd = log_pipe[1]; |
There was a problem hiding this comment.
If logs are large, the pipe can fill up and the child will block in write(), which can trigger a false timeout and kill.
Should we set the child’s pipe fd to non‑blocking (drop logs on EAGAIN) or otherwise avoid blocking writes?
| void MasterService::MetadataSerializer::Reset() { | ||
| for (auto& shard : service_->metadata_shards_) { | ||
| shard.metadata.clear(); | ||
| } |
There was a problem hiding this comment.
MetadataSerializer::Reset() only clears the metadata map. However, each shard also has processing_keys and replication_tasks, and there’s a global discarded_replicas_ list.
Should we clear them as part of Reset (or before restore) as well?
There was a problem hiding this comment.
The reset function now includes handling for discarded_replicas_ and next_id_.
Since snapshots currently clean up dirty data by default, processing_keys and replication_tasks are not serialized and saved. If oplog features are to be supported in the future, these fields may require further processing.
| return GetStorageConfigResponse(fsdir, enable_disk_eviction_, quota_bytes_); | ||
| } | ||
|
|
||
| auto MasterService::MountLocalDiskSegment(const UUID& client_id, |
There was a problem hiding this comment.
Should we add shared_lock(SNAPSHOT_MUTEX) in this function?
There was a problem hiding this comment.
Applied!Thanks for catching this.
| return ss.str(); | ||
| } | ||
|
|
||
| tl::expected<UUID, ErrorCode> MasterService::CreateCopyTask( |
There was a problem hiding this comment.
CreateCopyTask, CreateMoveTask, FetchTasks, MarkTaskToComplete, and TaskCleanupThreadFunc do not hold shared_lock(SNAPSHOT_MUTEX), but PersistState does serialize task_manager state. This means a snapshot fork can occur while these functions are modifying task state, leading to an inconsistent snapshot where task_manager and metadata are out of sync (e.g., a pending copy task referencing a replica that was already cleaned up during restore). Consider either adding shared_lock(SNAPSHOT_MUTEX) to these functions, or clearing non-finished tasks during restore.
There was a problem hiding this comment.
Applied!Thanks for catching this. I added shared_lock(SNAPSHOT_MUTEX) to these functions.
| const bool skip_cleanup = std::getenv( | ||
| "MOONCAKE_MASTER_SERVICE_SNAPSHOT_TEST_SKIP_CLEANUP"); | ||
| if (!skip_cleanup) { |
There was a problem hiding this comment.
After restore we unmount non‑OK segments, which invalidates allocators, but metadata isn’t immediately scanned for stale memory replicas. Cleanup only happens later on write paths via CleanupStaleHandles.
Would it make sense to call CleanupStaleHandles here?
There was a problem hiding this comment.
CleanupStaleHandles is called inside the UnmountSegment function. Therefore, we do not need to call CleanupStaleHandles again.
…chTasks and MarkTaskToComplete functions
…ateMoveTask and CreateCopyTask
|
@maheshrbapatu @maheshreddybapatu This PR is relevant to you. PTAL if you have interests. THX |
|
Thanks for adding me @stmatengss. I will take a look at this |
Description
This PR implements a snapshot and restore mechanism for Master Service
Related RFC: #1150
Implementation
Snapshot Generation:
Snapshot Recovery:
Storage Backends:
Known Limitations
Design Limitations:
Implementation Limitations:
Acknowledgements
Special thanks to iFLYTEK for designing and implementing this feature.
Type of Change
How Has This Been Tested?
Run the following test binaries:
These tests reuse existing master_service_test and master_service_ssd_test test cases. After each test case completes, snapshot/restore is performed to create a new Master, then the following validations are executed:
Note:
Checklist
./scripts/code_format.shbefore submitting.