Skip to content

Commit

Permalink
Merge pull request #17176 from YaoC/fix-learner-metric
Browse files Browse the repository at this point in the history
server: fix learner metric incorrect issue
  • Loading branch information
serathius authored Jan 12, 2024
2 parents 8b9e179 + f7ab7ad commit 40f22e9
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 21 deletions.
13 changes: 13 additions & 0 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
onSet(c.lg, c.version)

for _, m := range c.members {
if c.localID == m.ID {
setIsLearnerMetric(m)
}

c.lg.Info(
"recovered/added member from store",
zap.String("cluster-id", c.cid.String()),
Expand Down Expand Up @@ -393,6 +397,11 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
if c.v2store != nil {
mustSaveMemberToStore(c.lg, c.v2store, m)
}

if m.ID == c.localID {
setIsLearnerMetric(m)
}

if c.be != nil && shouldApplyV3 {
c.be.MustSaveMemberToBackend(m)

Expand Down Expand Up @@ -505,6 +514,10 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
mustUpdateMemberInStore(c.lg, c.v2store, &m)
}

if id == c.localID {
isLearner.Set(0)
}

if c.be != nil && shouldApplyV3 {
c.members[id].RaftAttributes.IsLearner = false
c.updateMembershipMetric(id, true)
Expand Down
15 changes: 15 additions & 0 deletions server/etcdserver/api/membership/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,24 @@ var (
},
[]string{"Local", "Remote"},
)
isLearner = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "is_learner",
Help: "Whether or not this member is a learner. 1 if is, 0 otherwise.",
})
)

func setIsLearnerMetric(m *Member) {
if m.IsLearner {
isLearner.Set(1)
} else {
isLearner.Set(0)
}
}

func init() {
prometheus.MustRegister(ClusterVersionMetrics)
prometheus.MustRegister(knownPeers)
prometheus.MustRegister(isLearner)
}
7 changes: 0 additions & 7 deletions server/etcdserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@ var (
Name: "leader_changes_seen_total",
Help: "The number of leader changes seen.",
})
isLearner = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "is_learner",
Help: "Whether or not this member is a learner. 1 if is, 0 otherwise.",
})
learnerPromoteFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "server",
Expand Down Expand Up @@ -171,7 +165,6 @@ func init() {
prometheus.MustRegister(currentVersion)
prometheus.MustRegister(currentGoVersion)
prometheus.MustRegister(serverID)
prometheus.MustRegister(isLearner)
prometheus.MustRegister(learnerPromoteSucceed)
prometheus.MustRegister(learnerPromoteFailed)
prometheus.MustRegister(fdUsed)
Expand Down
9 changes: 0 additions & 9 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2060,15 +2060,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
}
}

// update the isLearner metric when this server id is equal to the id in raft member confChange
if confChangeContext.Member.ID == s.MemberId() {
if cc.Type == raftpb.ConfChangeAddLearnerNode {
isLearner.Set(1)
} else {
isLearner.Set(0)
}
}

case raftpb.ConfChangeRemoveNode:
id := types.ID(cc.NodeID)
s.cluster.RemoveMember(id, shouldApplyV3)
Expand Down
76 changes: 76 additions & 0 deletions tests/e2e/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
package e2e

import (
"context"
"fmt"
"testing"
"time"

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

Expand All @@ -37,6 +40,18 @@ func TestV3MetricsInsecure(t *testing.T) {
testCtl(t, metricsTest)
}

func TestV3LearnerMetricRecover(t *testing.T) {
cfg := e2e.NewConfigTLS()
cfg.ServerConfig.SnapshotCount = 10
testCtl(t, learnerMetricRecoverTest, withCfg(*cfg))
}

func TestV3LearnerMetricApplyFromSnapshotTest(t *testing.T) {
cfg := e2e.NewConfigTLS()
cfg.ServerConfig.SnapshotCount = 10
testCtl(t, learnerMetricApplyFromSnapshotTest, withCfg(*cfg))
}

func metricsTest(cx ctlCtx) {
if err := ctlV3Put(cx, "k", "v", ""); err != nil {
cx.t.Fatal(err)
Expand Down Expand Up @@ -69,3 +84,64 @@ func metricsTest(cx ctlCtx) {
}
}
}

func learnerMetricRecoverTest(cx ctlCtx) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if _, err := cx.epc.StartNewProc(ctx, nil, cx.t, true /* addAsLearner */); err != nil {
cx.t.Fatal(err)
}
expectLearnerMetrics(cx)

triggerSnapshot(ctx, cx)

// Restart cluster
if err := cx.epc.Restart(ctx); err != nil {
cx.t.Fatal(err)
}
expectLearnerMetrics(cx)
}

func learnerMetricApplyFromSnapshotTest(cx ctlCtx) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Add learner but do not start it
_, learnerCfg, err := cx.epc.AddMember(ctx, nil, cx.t, true /* addAsLearner */)
if err != nil {
cx.t.Fatal(err)
}

triggerSnapshot(ctx, cx)

// Start the learner
if err = cx.epc.StartNewProcFromConfig(ctx, cx.t, learnerCfg); err != nil {
cx.t.Fatal(err)
}
expectLearnerMetrics(cx)
}

func triggerSnapshot(ctx context.Context, cx ctlCtx) {
etcdctl := cx.epc.Procs[0].Etcdctl()
for i := 0; i < int(cx.epc.Cfg.ServerConfig.SnapshotCount); i++ {
if err := etcdctl.Put(ctx, "k", "v", config.PutOptions{}); err != nil {
cx.t.Fatal(err)
}
}
}

func expectLearnerMetrics(cx ctlCtx) {
expectLearnerMetric(cx, 0, "etcd_server_is_learner 0")
expectLearnerMetric(cx, 1, "etcd_server_is_learner 1")
}

func expectLearnerMetric(cx ctlCtx, procIdx int, expectMetric string) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

args := e2e.CURLPrefixArgsCluster(cx.epc.Cfg, cx.epc.Procs[procIdx], "GET", e2e.CURLReq{Endpoint: "/metrics"})
if err := e2e.SpawnWithExpectsContext(ctx, args, nil, expect.ExpectedResponse{Value: expectMetric}); err != nil {
cx.t.Fatal(err)
}
}
28 changes: 23 additions & 5 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,21 @@ func (epc *EtcdProcessCluster) CloseProc(ctx context.Context, finder func(EtcdPr
// Phase 1 - Inform cluster of new configuration
// Phase 2 - Start new member
func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProcessClusterConfig, tb testing.TB, addAsLearner bool, opts ...config.ClientOption) (memberID uint64, err error) {
var serverCfg *EtcdServerProcessConfig
memberID, serverCfg, err := epc.AddMember(ctx, cfg, tb, addAsLearner, opts...)
if err != nil {
return 0, err
}

// Then start process
if err = epc.StartNewProcFromConfig(ctx, tb, serverCfg); err != nil {
return 0, err
}

return memberID, nil
}

// AddMember adds a new member to the cluster without starting it.
func (epc *EtcdProcessCluster) AddMember(ctx context.Context, cfg *EtcdProcessClusterConfig, tb testing.TB, addAsLearner bool, opts ...config.ClientOption) (memberID uint64, serverCfg *EtcdServerProcessConfig, err error) {
if cfg != nil {
serverCfg = cfg.EtcdServerProcessConfig(tb, epc.nextSeq)
} else {
Expand Down Expand Up @@ -808,20 +822,24 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces
resp, err = memberCtl.MemberAdd(ctx, serverCfg.Name, []string{serverCfg.PeerURL.String()})
}
if err != nil {
return 0, fmt.Errorf("failed to add new member: %w", err)
return 0, nil, fmt.Errorf("failed to add new member: %w", err)
}

// Then start process
return resp.Member.ID, serverCfg, nil
}

// StartNewProcFromConfig starts a new member process from the given config.
func (epc *EtcdProcessCluster) StartNewProcFromConfig(ctx context.Context, tb testing.TB, serverCfg *EtcdServerProcessConfig) error {
tb.Log("start new member")
proc, err := NewEtcdProcess(tb, serverCfg)
if err != nil {
epc.Close()
return 0, fmt.Errorf("cannot configure: %v", err)
return fmt.Errorf("cannot configure: %v", err)
}

epc.Procs = append(epc.Procs, proc)

return resp.Member.ID, proc.Start(ctx)
return proc.Start(ctx)
}

// UpdateProcOptions updates the options for a specific process. If no opt is set, then the config is identical
Expand Down

0 comments on commit 40f22e9

Please sign in to comment.