Skip to content

Comments

[Store][Feature]Hot Standby and Oplog Interface for Master Service HA#1515

Open
Libotry wants to merge 13 commits intokvcache-ai:mainfrom
Libotry:br_main_0207_standby_oplog_interface
Open

[Store][Feature]Hot Standby and Oplog Interface for Master Service HA#1515
Libotry wants to merge 13 commits intokvcache-ai:mainfrom
Libotry:br_main_0207_standby_oplog_interface

Conversation

@Libotry
Copy link
Contributor

@Libotry Libotry commented Feb 7, 2026

Description

Type of Change

  • Types
    • Bug fix
    • New feature
      • Transfer Engine
      • Mooncake Store
      • Mooncake EP
      • Integration
      • P2P Store
      • Python Wheel
    • Breaking change
    • CI/CD
    • Documentation update
    • Other

How Has This Been Tested?

Checklist

  • I have performed a self-review of my own code.
  • I have formatted my own code using ./scripts/code_format.sh before submitting.
  • I have updated the documentation.
  • I have added tests to prove my changes are effective.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @Libotry, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Master Service's High Availability capabilities by introducing a hot standby replication mechanism. It establishes a robust framework for replicating metadata changes from a primary instance to standby instances using an etcd-based operation log. This ensures data consistency and enables rapid failover, thereby improving the overall fault tolerance and reliability of the system. The changes encompass the entire replication pipeline, from logging operations to their application on standby nodes, complete with state management and comprehensive monitoring.

Highlights

  • Hot Standby Service Introduction: Introduces a comprehensive Hot Standby service for the Master Service, enabling High Availability (HA) through etcd-backed OpLog replication and state management.
  • OpLog Management and Persistence: Adds OpLogManager for in-memory operation logging and EtcdOpLogStore for durable persistence of OpLog entries to etcd, supporting batch writes (group commit) and fixed-width sequence ID keys.
  • OpLog Replication and Application: Implements OpLogWatcher to monitor etcd for new OpLog entries and OpLogApplier to apply these entries to the Standby's local metadata store, ensuring global ordering, handling out-of-order entries, and resolving missing entries with retries and timeouts.
  • State Management and Metrics: Integrates a StandbyStateMachine for robust lifecycle management of the Hot Standby service and HAMetricManager for collecting and exposing Prometheus-compatible metrics on replication status, lag, and errors.
  • Data Integrity and Validation: Incorporates xxHash for fast checksum computation on OpLog entry payloads, includes size validation for keys and payloads, and adds uint64_t sequence ID comparison utilities to handle wrap-around scenarios safely.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • dependencies.sh
    • Added libxxhash-dev to system packages, providing a fast hashing library for checksum computations.
  • mooncake-store/include/etcd_oplog_store.h
    • Added EtcdOpLogStore class for storing and retrieving OpLog entries in etcd, including batch update mechanisms for the latest sequence ID and group commit for OpLog writes.
  • mooncake-store/include/ha_metric_manager.h
    • Added HAMetricManager singleton for collecting and exposing High Availability (HA) related metrics, such as OpLog sequence tracking, replication lag, error counters, and performance histograms.
  • mooncake-store/include/hot_standby_service.h
    • Added HotStandbyService class to manage standby replication, including connecting to a primary, applying OpLog entries, verifying data consistency, and promoting to primary status.
  • mooncake-store/include/metadata_store.h
    • Added MetadataStore abstract interface and StandbyObjectMetadata struct to define how metadata is stored and managed on standby nodes.
  • mooncake-store/include/oplog_applier.h
    • Added OpLogApplier class responsible for applying OpLog entries to the standby metadata store, ensuring correct ordering, handling missing entries, and performing data validation.
  • mooncake-store/include/oplog_manager.h
    • Added OpLogManager class for managing in-memory operation logs, including sequence ID allocation, checksum computation, and size validation for OpLog entries.
  • mooncake-store/include/oplog_watcher.h
    • Added OpLogWatcher class to watch etcd for OpLog changes, perform initial synchronization, and forward events to the OpLogApplier with robust reconnection logic.
  • mooncake-store/include/snapshot_provider.h
    • Added SnapshotProvider abstract interface for loading metadata snapshots, facilitating faster bootstrap of standby nodes.
  • mooncake-store/include/standby_state_machine.h
    • Added StandbyStateMachine class to manage the lifecycle and state transitions of the Hot Standby service, providing a clear state model for operational clarity.
  • mooncake-store/include/types.h
    • Added utility functions for uint64_t sequence ID comparison to safely handle wrap-around scenarios.
    • Introduced IsValidClusterIdComponent for validating etcd cluster IDs to prevent key-prefix injection.
    • Included ylt/struct_pack.hpp for efficient binary serialization of metadata payloads.
  • mooncake-store/src/CMakeLists.txt
    • Updated source file list to include new HA-related C++ files.
    • Configured CMake to find and link the xxHash library.
  • mooncake-store/src/etcd_oplog_store.cpp
    • Implemented EtcdOpLogStore functionalities, including fixed-width key generation, Base64 encoding/decoding for binary payloads, and background threads for batch updates and group commits.
  • mooncake-store/src/ha_metric_manager.cpp
    • Implemented HAMetricManager to initialize and manage Prometheus-style metrics for HA components.
  • mooncake-store/src/hot_standby_service.cpp
    • Implemented HotStandbyService logic, including StandbyMetadataStore, snapshot bootstrap, and the main replication and verification loops.
  • mooncake-store/src/oplog_applier.cpp
    • Implemented OpLogApplier logic, focusing on ordered application of OpLog entries, handling pending entries, and resolving gaps by requesting missing data from etcd.
  • mooncake-store/src/oplog_manager.cpp
    • Implemented OpLogManager with xxHash for checksums and prefix hashes, and logic for persisting entries to EtcdOpLogStore.
  • mooncake-store/src/oplog_watcher.cpp
    • Implemented OpLogWatcher with initial sync from etcd, event handling, and reconnection strategies, ensuring consistent watch resumption.
  • mooncake-store/src/standby_state_machine.cpp
    • Implemented StandbyStateMachine with defined state transitions, history tracking, and callback mechanisms for state changes.
  • mooncake-store/tests/hot_standby_ut/etcd_oplog_store_test.cpp
    • Added unit tests for EtcdOpLogStore covering CRUD operations, serialization, fencing, sequence ID management, batch updates, cleanup, and cluster ID validation.
  • mooncake-store/tests/hot_standby_ut/ha_metric_manager_test.cpp
    • Added unit tests for HAMetricManager to verify metric updates, serialization, singleton behavior, and concurrent access.
  • mooncake-store/tests/hot_standby_ut/hot_standby_service_test.cpp
    • Added unit tests for HotStandbyService covering start/stop, state transitions, sync status, promotion, warm start, and metadata operations.
  • mooncake-store/tests/hot_standby_ut/oplog_applier_test.cpp
    • Added unit tests for OpLogApplier covering basic apply, sequence ordering, gap resolution, checksums, size validation, recovery, pending entries, and JSON parsing.
  • mooncake-store/tests/hot_standby_ut/oplog_manager_test.cpp
    • Added unit tests for OpLogManager covering append, sequence ID allocation, persistence, checksums, size validation, and concurrent operations.
  • mooncake-store/tests/hot_standby_ut/oplog_watcher_test.cpp
    • Added unit tests for OpLogWatcher covering start/stop, event handling, reconnection, checksum/size validation, and state callbacks.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive high-availability (HA) mechanism for the Master Service, centered around an etcd-based operational log (oplog). The changes are extensive and well-structured, adding key components like EtcdOpLogStore for persistence, OpLogManager for creating entries, OpLogWatcher for observing changes, OpLogApplier for applying them, and a StandbyStateMachine to manage the lifecycle of a hot standby instance. The design thoughtfully includes features for robustness, such as checksums, group commit, gap detection in the oplog, and a "read-then-watch" pattern for consistency.

My review identified a significant performance issue where an EtcdOpLogStore object is inefficiently created within a loop. I also found some opportunities for code improvement by removing duplicated helper functions and simplifying some logic. Overall, this is a strong feature addition with a solid design and good test coverage.

// Note: `/latest` is batch-updated on Primary, so this is for monitoring only.
#ifdef STORE_USE_ETCD
if (!cluster_id_.empty()) {
EtcdOpLogStore oplog_store(cluster_id_, /*enable_latest_seq_batch_update=*/false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The EtcdOpLogStore is created inside the ReplicationLoop's while loop. This is inefficient as it will be constructed and destructed on every iteration. The constructor for EtcdOpLogStore can be expensive as it may involve etcd operations. This object should be created once before the loop begins to avoid this repeated overhead.

For example:

void HotStandbyService::ReplicationLoop() {
    LOG(INFO) << "Replication loop started (etcd-based OpLog sync)";

#ifdef STORE_USE_ETCD
    std::unique_ptr<EtcdOpLogStore> oplog_store;
    if (!cluster_id_.empty()) {
        oplog_store = std::make_unique<EtcdOpLogStore>(cluster_id_, /*enable_latest_seq_batch_update=*/false);
    }
#endif

    while (IsRunning()) {
        // ...
#ifdef STORE_USE_ETCD
        if (oplog_store) {
            uint64_t latest_seq = 0;
            ErrorCode err = oplog_store->GetLatestSequenceId(latest_seq);
            if (err == ErrorCode::OK) {
                primary_seq_id_.store(latest_seq);
            }
        }
#endif
        // ...
    }
    // ...
}

Comment on lines 567 to 667
// Base64 encoding for binary payload
// JsonCpp treats strings as UTF-8, so we must encode binary data
std::string Base64Encode(const std::string& data) {
static const char base64_chars[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";

std::string result;
result.reserve(((data.size() + 2) / 3) * 4);

size_t i = 0;
size_t data_len = data.size();

// Process 3 bytes at a time
while (i + 2 < data_len) {
uint32_t octet_a = static_cast<unsigned char>(data[i++]);
uint32_t octet_b = static_cast<unsigned char>(data[i++]);
uint32_t octet_c = static_cast<unsigned char>(data[i++]);

uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c;

result.push_back(base64_chars[(triple >> 18) & 0x3F]);
result.push_back(base64_chars[(triple >> 12) & 0x3F]);
result.push_back(base64_chars[(triple >> 6) & 0x3F]);
result.push_back(base64_chars[triple & 0x3F]);
}

// Handle remaining bytes
size_t remaining = data_len - i;
if (remaining > 0) {
uint32_t octet_a = static_cast<unsigned char>(data[i++]);
uint32_t octet_b = (remaining > 1) ? static_cast<unsigned char>(data[i++]) : 0;
uint32_t octet_c = 0;

uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c;

result.push_back(base64_chars[(triple >> 18) & 0x3F]);
result.push_back(base64_chars[(triple >> 12) & 0x3F]);
result.push_back((remaining > 1) ? base64_chars[(triple >> 6) & 0x3F] : '=');
result.push_back('=');
}

return result;
}

std::string Base64Decode(const std::string& encoded) {
static const unsigned char decode_table[256] = {
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 62, 64, 64, 64, 63,
52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 64, 64, 64, 64, 64, 64,
64, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 64, 64, 64, 64, 64,
64, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64,
64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64, 64
};

std::string result;
result.reserve((encoded.size() * 3) / 4);

size_t i = 0;
while (i < encoded.size()) {
// Skip whitespace and invalid chars
while (i < encoded.size() && (encoded[i] == ' ' || encoded[i] == '\n' || encoded[i] == '\r' || encoded[i] == '\t')) {
i++;
}
if (i >= encoded.size()) break;

uint32_t sextet_a = decode_table[static_cast<unsigned char>(encoded[i++])];
if (i >= encoded.size() || sextet_a == 64) break;

uint32_t sextet_b = decode_table[static_cast<unsigned char>(encoded[i++])];
if (sextet_b == 64) break;

uint32_t sextet_c = (i < encoded.size()) ? decode_table[static_cast<unsigned char>(encoded[i++])] : 64;
uint32_t sextet_d = (i < encoded.size()) ? decode_table[static_cast<unsigned char>(encoded[i++])] : 64;

uint32_t triple = (sextet_a << 18) | (sextet_b << 12) |
((sextet_c != 64) ? (sextet_c << 6) : 0) |
((sextet_d != 64) ? sextet_d : 0);

result.push_back(static_cast<char>((triple >> 16) & 0xFF));
if (sextet_c != 64) {
result.push_back(static_cast<char>((triple >> 8) & 0xFF));
}
if (sextet_d != 64) {
result.push_back(static_cast<char>(triple & 0xFF));
}
}

return result;
}

} // namespace
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Base64Encode and Base64Decode helper functions are also implemented in mooncake-store/src/oplog_watcher.cpp. This code duplication should be avoided. Consider moving these functions to a common utility file (e.g., utils.h/.cpp) and calling them from both places to improve maintainability. Using a well-tested third-party library for Base64 encoding/decoding would be even more robust if one is available in the project's dependencies.

Comment on lines 454 to 472
MetadataPayload payload;
bool parse_success = false;
auto result = struct_pack::deserialize_to(payload, entry.payload);
if (result == struct_pack::errc::ok) {
parse_success = true;
} else {
LOG(ERROR) << "OpLogApplier: failed to deserialize payload for key=" << entry.object_key
<< ", sequence_id=" << entry.sequence_id
<< ", payload_size=" << entry.payload.size()
<< ", error_code=" << static_cast<int>(result);
}

if (!parse_success) {
// Fallback to empty metadata if parsing fails
StandbyObjectMetadata empty_metadata;
empty_metadata.last_sequence_id = entry.sequence_id;
metadata_store_->PutMetadata(entry.object_key, empty_metadata);
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for deserializing the payload and handling failure can be simplified. The parse_success boolean variable is redundant. You can directly check the result of struct_pack::deserialize_to and handle the failure case within the same if block.

    MetadataPayload payload;
    auto result = struct_pack::deserialize_to(payload, entry.payload);
    if (result != struct_pack::errc::ok) {
        LOG(ERROR) << "OpLogApplier: failed to deserialize payload for key=" << entry.object_key
                   << ", sequence_id=" << entry.sequence_id
                   << ", payload_size=" << entry.payload.size()
                   << ", error_code=" << static_cast<int>(result);
        // Fallback to empty metadata if parsing fails
        StandbyObjectMetadata empty_metadata;
        empty_metadata.last_sequence_id = entry.sequence_id;
        metadata_store_->PutMetadata(entry.object_key, empty_metadata);
        return;
    }

@Libotry Libotry force-pushed the br_main_0207_standby_oplog_interface branch from 6021d94 to 7a63de5 Compare February 7, 2026 09:43
@codecov-commenter
Copy link

codecov-commenter commented Feb 7, 2026

@00fish0 00fish0 self-assigned this Feb 8, 2026
@Libotry Libotry force-pushed the br_main_0207_standby_oplog_interface branch from 5d23699 to dc40185 Compare February 9, 2026 02:39
std::vector<OpLogEntry>& entries,
EtcdRevisionId& revision_id) {
#ifdef STORE_USE_ETCD
EtcdOpLogStore oplog_store(cluster_id_,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EtcdOpLogStore constructor performs etcd I/O (checks the /latest key) and starts the BatchWriteThread background thread. ReadOpLogSince is called multiple times (within a loop) in StartFromSequenceId and SyncMissedEntries, and creating and destroying it each time incurs significant overhead. It is recommended to lazily initialize it as a class member variable, similar to the approach of OpLogApplier::GetEtcdOpLogStore().

@ykwd
Copy link
Collaborator

ykwd commented Feb 11, 2026

Cool! I believe this is another step toward our HA capabilities, which many users have been eagerly expecting. I’ll take a close look together with @00fish0.

@Libotry Libotry force-pushed the br_main_0207_standby_oplog_interface branch from 0994319 to 4480702 Compare February 12, 2026 03:32
@00fish0
Copy link
Collaborator

00fish0 commented Feb 13, 2026

Thanks for your great work! I'm currently reviewing these changes and expect to finish the review in 2-3 days. Will get back to you soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants