[Store][Feature]Hot Standby and Oplog Interface for Master Service HA#1515
[Store][Feature]Hot Standby and Oplog Interface for Master Service HA#1515Libotry wants to merge 13 commits intokvcache-ai:mainfrom
Conversation
Summary of ChangesHello @Libotry, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Master Service's High Availability capabilities by introducing a hot standby replication mechanism. It establishes a robust framework for replicating metadata changes from a primary instance to standby instances using an etcd-based operation log. This ensures data consistency and enables rapid failover, thereby improving the overall fault tolerance and reliability of the system. The changes encompass the entire replication pipeline, from logging operations to their application on standby nodes, complete with state management and comprehensive monitoring. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive high-availability (HA) mechanism for the Master Service, centered around an etcd-based operational log (oplog). The changes are extensive and well-structured, adding key components like EtcdOpLogStore for persistence, OpLogManager for creating entries, OpLogWatcher for observing changes, OpLogApplier for applying them, and a StandbyStateMachine to manage the lifecycle of a hot standby instance. The design thoughtfully includes features for robustness, such as checksums, group commit, gap detection in the oplog, and a "read-then-watch" pattern for consistency.
My review identified a significant performance issue where an EtcdOpLogStore object is inefficiently created within a loop. I also found some opportunities for code improvement by removing duplicated helper functions and simplifying some logic. Overall, this is a strong feature addition with a solid design and good test coverage.
| // Note: `/latest` is batch-updated on Primary, so this is for monitoring only. | ||
| #ifdef STORE_USE_ETCD | ||
| if (!cluster_id_.empty()) { | ||
| EtcdOpLogStore oplog_store(cluster_id_, /*enable_latest_seq_batch_update=*/false); |
There was a problem hiding this comment.
The EtcdOpLogStore is created inside the ReplicationLoop's while loop. This is inefficient as it will be constructed and destructed on every iteration. The constructor for EtcdOpLogStore can be expensive as it may involve etcd operations. This object should be created once before the loop begins to avoid this repeated overhead.
For example:
void HotStandbyService::ReplicationLoop() {
LOG(INFO) << "Replication loop started (etcd-based OpLog sync)";
#ifdef STORE_USE_ETCD
std::unique_ptr<EtcdOpLogStore> oplog_store;
if (!cluster_id_.empty()) {
oplog_store = std::make_unique<EtcdOpLogStore>(cluster_id_, /*enable_latest_seq_batch_update=*/false);
}
#endif
while (IsRunning()) {
// ...
#ifdef STORE_USE_ETCD
if (oplog_store) {
uint64_t latest_seq = 0;
ErrorCode err = oplog_store->GetLatestSequenceId(latest_seq);
if (err == ErrorCode::OK) {
primary_seq_id_.store(latest_seq);
}
}
#endif
// ...
}
// ...
}| // Base64 encoding for binary payload | ||
| // JsonCpp treats strings as UTF-8, so we must encode binary data | ||
| std::string Base64Encode(const std::string& data) { | ||
| static const char base64_chars[] = | ||
| "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; | ||
|
|
||
| std::string result; | ||
| result.reserve(((data.size() + 2) / 3) * 4); | ||
|
|
||
| size_t i = 0; | ||
| size_t data_len = data.size(); | ||
|
|
||
| // Process 3 bytes at a time | ||
| while (i + 2 < data_len) { | ||
| uint32_t octet_a = static_cast<unsigned char>(data[i++]); | ||
| uint32_t octet_b = static_cast<unsigned char>(data[i++]); | ||
| uint32_t octet_c = static_cast<unsigned char>(data[i++]); | ||
|
|
||
| uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c; | ||
|
|
||
| result.push_back(base64_chars[(triple >> 18) & 0x3F]); | ||
| result.push_back(base64_chars[(triple >> 12) & 0x3F]); | ||
| result.push_back(base64_chars[(triple >> 6) & 0x3F]); | ||
| result.push_back(base64_chars[triple & 0x3F]); | ||
| } | ||
|
|
||
| // Handle remaining bytes | ||
| size_t remaining = data_len - i; | ||
| if (remaining > 0) { | ||
| uint32_t octet_a = static_cast<unsigned char>(data[i++]); | ||
| uint32_t octet_b = (remaining > 1) ? static_cast<unsigned char>(data[i++]) : 0; | ||
| uint32_t octet_c = 0; | ||
|
|
||
| uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c; | ||
|
|
||
| result.push_back(base64_chars[(triple >> 18) & 0x3F]); | ||
| result.push_back(base64_chars[(triple >> 12) & 0x3F]); | ||
| result.push_back((remaining > 1) ? base64_chars[(triple >> 6) & 0x3F] : '='); | ||
| result.push_back('='); | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| std::string Base64Decode(const std::string& encoded) { | ||
| static const unsigned char decode_table[256] = { | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 62, 64, 64, 64, 63, | ||
| 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 64, 64, 64, 64, 64, 64, | ||
| 64, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, | ||
| 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 64, 64, 64, 64, 64, | ||
| 64, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, | ||
| 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, | ||
| 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64 | ||
| }; | ||
|
|
||
| std::string result; | ||
| result.reserve((encoded.size() * 3) / 4); | ||
|
|
||
| size_t i = 0; | ||
| while (i < encoded.size()) { | ||
| // Skip whitespace and invalid chars | ||
| while (i < encoded.size() && (encoded[i] == ' ' || encoded[i] == '\n' || encoded[i] == '\r' || encoded[i] == '\t')) { | ||
| i++; | ||
| } | ||
| if (i >= encoded.size()) break; | ||
|
|
||
| uint32_t sextet_a = decode_table[static_cast<unsigned char>(encoded[i++])]; | ||
| if (i >= encoded.size() || sextet_a == 64) break; | ||
|
|
||
| uint32_t sextet_b = decode_table[static_cast<unsigned char>(encoded[i++])]; | ||
| if (sextet_b == 64) break; | ||
|
|
||
| uint32_t sextet_c = (i < encoded.size()) ? decode_table[static_cast<unsigned char>(encoded[i++])] : 64; | ||
| uint32_t sextet_d = (i < encoded.size()) ? decode_table[static_cast<unsigned char>(encoded[i++])] : 64; | ||
|
|
||
| uint32_t triple = (sextet_a << 18) | (sextet_b << 12) | | ||
| ((sextet_c != 64) ? (sextet_c << 6) : 0) | | ||
| ((sextet_d != 64) ? sextet_d : 0); | ||
|
|
||
| result.push_back(static_cast<char>((triple >> 16) & 0xFF)); | ||
| if (sextet_c != 64) { | ||
| result.push_back(static_cast<char>((triple >> 8) & 0xFF)); | ||
| } | ||
| if (sextet_d != 64) { | ||
| result.push_back(static_cast<char>(triple & 0xFF)); | ||
| } | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| } // namespace |
There was a problem hiding this comment.
The Base64Encode and Base64Decode helper functions are also implemented in mooncake-store/src/oplog_watcher.cpp. This code duplication should be avoided. Consider moving these functions to a common utility file (e.g., utils.h/.cpp) and calling them from both places to improve maintainability. Using a well-tested third-party library for Base64 encoding/decoding would be even more robust if one is available in the project's dependencies.
| MetadataPayload payload; | ||
| bool parse_success = false; | ||
| auto result = struct_pack::deserialize_to(payload, entry.payload); | ||
| if (result == struct_pack::errc::ok) { | ||
| parse_success = true; | ||
| } else { | ||
| LOG(ERROR) << "OpLogApplier: failed to deserialize payload for key=" << entry.object_key | ||
| << ", sequence_id=" << entry.sequence_id | ||
| << ", payload_size=" << entry.payload.size() | ||
| << ", error_code=" << static_cast<int>(result); | ||
| } | ||
|
|
||
| if (!parse_success) { | ||
| // Fallback to empty metadata if parsing fails | ||
| StandbyObjectMetadata empty_metadata; | ||
| empty_metadata.last_sequence_id = entry.sequence_id; | ||
| metadata_store_->PutMetadata(entry.object_key, empty_metadata); | ||
| return; | ||
| } |
There was a problem hiding this comment.
The logic for deserializing the payload and handling failure can be simplified. The parse_success boolean variable is redundant. You can directly check the result of struct_pack::deserialize_to and handle the failure case within the same if block.
MetadataPayload payload;
auto result = struct_pack::deserialize_to(payload, entry.payload);
if (result != struct_pack::errc::ok) {
LOG(ERROR) << "OpLogApplier: failed to deserialize payload for key=" << entry.object_key
<< ", sequence_id=" << entry.sequence_id
<< ", payload_size=" << entry.payload.size()
<< ", error_code=" << static_cast<int>(result);
// Fallback to empty metadata if parsing fails
StandbyObjectMetadata empty_metadata;
empty_metadata.last_sequence_id = entry.sequence_id;
metadata_store_->PutMetadata(entry.object_key, empty_metadata);
return;
}6021d94 to
7a63de5
Compare
|
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
5d23699 to
dc40185
Compare
mooncake-store/src/oplog_watcher.cpp
Outdated
| std::vector<OpLogEntry>& entries, | ||
| EtcdRevisionId& revision_id) { | ||
| #ifdef STORE_USE_ETCD | ||
| EtcdOpLogStore oplog_store(cluster_id_, |
There was a problem hiding this comment.
The EtcdOpLogStore constructor performs etcd I/O (checks the /latest key) and starts the BatchWriteThread background thread. ReadOpLogSince is called multiple times (within a loop) in StartFromSequenceId and SyncMissedEntries, and creating and destroying it each time incurs significant overhead. It is recommended to lazily initialize it as a class member variable, similar to the approach of OpLogApplier::GetEtcdOpLogStore().
|
Cool! I believe this is another step toward our HA capabilities, which many users have been eagerly expecting. I’ll take a close look together with @00fish0. |
0994319 to
4480702
Compare
|
Thanks for your great work! I'm currently reviewing these changes and expect to finish the review in 2-3 days. Will get back to you soon. |
Description
Type of Change
How Has This Been Tested?
Checklist
./scripts/code_format.shbefore submitting.