Skip to content

Commit

Permalink
Run a separate in memory snapshot to reduce number of entries stored …
Browse files Browse the repository at this point in the history
…in raft memory storage

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Nov 2, 2024
1 parent 3de0018 commit d0cff01
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 34 deletions.
6 changes: 0 additions & 6 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,12 +899,6 @@ func (c *RaftCluster) Store(store v2store.Store) {
if m.ClientURLs != nil {
mustUpdateMemberAttrInStore(c.lg, store, m)
}
c.lg.Info(
"snapshot storing member",
zap.String("id", m.ID.String()),
zap.Strings("peer-urls", m.PeerURLs),
zap.Bool("is-learner", m.IsLearner),
)
}
for id := range c.removed {
//We do not need to delete the member since the store is empty.
Expand Down
84 changes: 57 additions & 27 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ const (
readyPercentThreshold = 0.9

DowngradeEnabledPath = "/downgrade/enabled"
memorySnapshotCount = 10
)

var (
Expand Down Expand Up @@ -291,9 +292,10 @@ type EtcdServer struct {
clusterVersionChanged *notify.Notifier

*AccessController
// forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
forceSnapshot bool
// TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
forceDiskSnapshot bool
corruptionChecker CorruptionChecker
}

Expand Down Expand Up @@ -741,10 +743,11 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
}

type etcdProgress struct {
confState raftpb.ConfState
snapi uint64
appliedt uint64
appliedi uint64
confState raftpb.ConfState
diskSnapshotIndex uint64
memorySnapshotIndex uint64
appliedt uint64
appliedi uint64
}

// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
Expand Down Expand Up @@ -809,10 +812,11 @@ func (s *EtcdServer) run() {
s.r.start(rh)

ep := etcdProgress{
confState: sn.Metadata.ConfState,
snapi: sn.Metadata.Index,
appliedt: sn.Metadata.Term,
appliedi: sn.Metadata.Index,
confState: sn.Metadata.ConfState,
diskSnapshotIndex: sn.Metadata.Index,
memorySnapshotIndex: sn.Metadata.Index,
appliedt: sn.Metadata.Term,
appliedi: sn.Metadata.Index,
}

defer func() {
Expand Down Expand Up @@ -998,15 +1002,15 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
lg := s.Logger()
lg.Info(
"applying snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
)
defer func() {
lg.Info(
"applied snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
Expand All @@ -1017,7 +1021,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
if toApply.snapshot.Metadata.Index <= ep.appliedi {
lg.Panic(
"unexpected leader snapshot from outdated index",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
Expand Down Expand Up @@ -1132,7 +1136,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {

ep.appliedt = toApply.snapshot.Metadata.Term
ep.appliedi = toApply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.diskSnapshotIndex = ep.appliedi
ep.memorySnapshotIndex = ep.appliedi
ep.confState = toApply.snapshot.Metadata.ConfState

// As backends and implementations like alarmsStore changed, we need
Expand Down Expand Up @@ -1188,31 +1193,37 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
}

func (s *EtcdServer) ForceSnapshot() {
s.forceSnapshot = true
s.forceDiskSnapshot = true
}

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if !s.shouldSnapshot(ep) {
if !s.shouldSnapshotToDisk(ep) {
if ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount {
s.snapshotToMemory(ep.appliedi, ep.confState)
s.compactRaftLog(ep.appliedi)
ep.memorySnapshotIndex = ep.appliedi
}
return
}
//TODO: Remove disk snapshot in v3.7
lg := s.Logger()
lg.Info(
"triggering snapshot",
zap.String("local-member-id", s.MemberID().String()),
zap.Uint64("local-member-applied-index", ep.appliedi),
zap.Uint64("local-member-snapshot-index", ep.snapi),
zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex),
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
zap.Bool("snapshot-forced", s.forceSnapshot),
zap.Bool("snapshot-forced", s.forceDiskSnapshot),
)
s.forceSnapshot = false
s.forceDiskSnapshot = false

s.snapshot(ep.appliedi, ep.confState)
s.snapshotToDisk(ep.appliedi, ep.confState)
s.compactRaftLog(ep.appliedi)
ep.snapi = ep.appliedi
ep.diskSnapshotIndex = ep.appliedi
}

func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool {
return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount)
func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool {
return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount)
}

func (s *EtcdServer) hasMultipleVotingMembers() bool {
Expand Down Expand Up @@ -2132,7 +2143,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
}

// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
func (s *EtcdServer) snapshotToDisk(snapi uint64, confState raftpb.ConfState) {
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
// commit kv to write metadata (for example: consistent index) to disk.
//
Expand Down Expand Up @@ -2174,6 +2185,25 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
)
}

func (s *EtcdServer) snapshotToMemory(snapi uint64, confState raftpb.ConfState) {
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)

lg := s.Logger()

// For backward compatibility, generate v2 snapshot from v3 state.
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
if err != nil {
// the snapshot was done asynchronously with the progress of raft.
// raft might have already got a newer snapshot.
if errorspkg.Is(err, raft.ErrSnapOutOfDate) {
return
}
lg.Panic("failed to create snapshot", zap.Error(err))
}

verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())
}

func (s *EtcdServer) compactRaftLog(snapi uint64) {
lg := s.Logger()

Expand All @@ -2189,10 +2219,10 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
if snapi <= s.Cfg.SnapshotCatchUpEntries {
return
}

compacti = snapi - s.Cfg.SnapshotCatchUpEntries
err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func TestSnapshot(t *testing.T) {
}
}()

srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}})
srv.snapshotToDisk(1, raftpb.ConfState{Voters: []uint64{1}})
<-ch
if len(st.Action()) != 0 {
t.Errorf("no action expected on v2store. Got %d actions", len(st.Action()))
Expand Down

0 comments on commit d0cff01

Please sign in to comment.