diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index f022d29c44e..6becdfd62ed 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -284,6 +284,10 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { onSet(c.lg, c.version) for _, m := range c.members { + if c.localID == m.ID { + setIsLearnerMetric(m) + } + c.lg.Info( "recovered/added member from store", zap.String("cluster-id", c.cid.String()), @@ -393,6 +397,11 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) { if c.v2store != nil { mustSaveMemberToStore(c.lg, c.v2store, m) } + + if m.ID == c.localID { + setIsLearnerMetric(m) + } + if c.be != nil && shouldApplyV3 { c.be.MustSaveMemberToBackend(m) @@ -505,6 +514,10 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) { mustUpdateMemberInStore(c.lg, c.v2store, &m) } + if id == c.localID { + isLearner.Set(0) + } + if c.be != nil && shouldApplyV3 { c.members[id].RaftAttributes.IsLearner = false c.updateMembershipMetric(id, true) diff --git a/server/etcdserver/api/membership/metrics.go b/server/etcdserver/api/membership/metrics.go index f08763779f0..7eb6fb6b728 100644 --- a/server/etcdserver/api/membership/metrics.go +++ b/server/etcdserver/api/membership/metrics.go @@ -32,9 +32,24 @@ var ( }, []string{"Local", "Remote"}, ) + isLearner = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "is_learner", + Help: "Whether or not this member is a learner. 1 if is, 0 otherwise.", + }) ) +func setIsLearnerMetric(m *Member) { + if m.IsLearner { + isLearner.Set(1) + } else { + isLearner.Set(0) + } +} + func init() { prometheus.MustRegister(ClusterVersionMetrics) prometheus.MustRegister(knownPeers) + prometheus.MustRegister(isLearner) } diff --git a/server/etcdserver/metrics.go b/server/etcdserver/metrics.go index 7af0e7354d0..4be44a57a6c 100644 --- a/server/etcdserver/metrics.go +++ b/server/etcdserver/metrics.go @@ -44,12 +44,6 @@ var ( Name: "leader_changes_seen_total", Help: "The number of leader changes seen.", }) - isLearner = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "etcd", - Subsystem: "server", - Name: "is_learner", - Help: "Whether or not this member is a learner. 1 if is, 0 otherwise.", - }) learnerPromoteFailed = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "etcd", Subsystem: "server", @@ -171,7 +165,6 @@ func init() { prometheus.MustRegister(currentVersion) prometheus.MustRegister(currentGoVersion) prometheus.MustRegister(serverID) - prometheus.MustRegister(isLearner) prometheus.MustRegister(learnerPromoteSucceed) prometheus.MustRegister(learnerPromoteFailed) prometheus.MustRegister(fdUsed) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 59ae7b739b7..83951434c8d 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2060,15 +2060,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con } } - // update the isLearner metric when this server id is equal to the id in raft member confChange - if confChangeContext.Member.ID == s.MemberId() { - if cc.Type == raftpb.ConfChangeAddLearnerNode { - isLearner.Set(1) - } else { - isLearner.Set(0) - } - } - case raftpb.ConfChangeRemoveNode: id := types.ID(cc.NodeID) s.cluster.RemoveMember(id, shouldApplyV3) diff --git a/tests/e2e/metrics_test.go b/tests/e2e/metrics_test.go index 312b5aab5e2..1ed85f5bbfa 100644 --- a/tests/e2e/metrics_test.go +++ b/tests/e2e/metrics_test.go @@ -15,11 +15,14 @@ package e2e import ( + "context" "fmt" "testing" + "time" "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/pkg/v3/expect" + "go.etcd.io/etcd/tests/v3/framework/config" "go.etcd.io/etcd/tests/v3/framework/e2e" ) @@ -37,6 +40,18 @@ func TestV3MetricsInsecure(t *testing.T) { testCtl(t, metricsTest) } +func TestV3LearnerMetricRecover(t *testing.T) { + cfg := e2e.NewConfigTLS() + cfg.ServerConfig.SnapshotCount = 10 + testCtl(t, learnerMetricRecoverTest, withCfg(*cfg)) +} + +func TestV3LearnerMetricApplyFromSnapshotTest(t *testing.T) { + cfg := e2e.NewConfigTLS() + cfg.ServerConfig.SnapshotCount = 10 + testCtl(t, learnerMetricApplyFromSnapshotTest, withCfg(*cfg)) +} + func metricsTest(cx ctlCtx) { if err := ctlV3Put(cx, "k", "v", ""); err != nil { cx.t.Fatal(err) @@ -69,3 +84,64 @@ func metricsTest(cx ctlCtx) { } } } + +func learnerMetricRecoverTest(cx ctlCtx) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if _, err := cx.epc.StartNewProc(ctx, nil, cx.t, true /* addAsLearner */); err != nil { + cx.t.Fatal(err) + } + expectLearnerMetrics(cx) + + triggerSnapshot(ctx, cx) + + // Restart cluster + if err := cx.epc.Restart(ctx); err != nil { + cx.t.Fatal(err) + } + expectLearnerMetrics(cx) +} + +func learnerMetricApplyFromSnapshotTest(cx ctlCtx) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Add learner but do not start it + _, learnerCfg, err := cx.epc.AddMember(ctx, nil, cx.t, true /* addAsLearner */) + if err != nil { + cx.t.Fatal(err) + } + + triggerSnapshot(ctx, cx) + + // Start the learner + if err = cx.epc.StartNewProcFromConfig(ctx, cx.t, learnerCfg); err != nil { + cx.t.Fatal(err) + } + expectLearnerMetrics(cx) +} + +func triggerSnapshot(ctx context.Context, cx ctlCtx) { + etcdctl := cx.epc.Procs[0].Etcdctl() + for i := 0; i < int(cx.epc.Cfg.ServerConfig.SnapshotCount); i++ { + if err := etcdctl.Put(ctx, "k", "v", config.PutOptions{}); err != nil { + cx.t.Fatal(err) + } + } +} + +func expectLearnerMetrics(cx ctlCtx) { + expectLearnerMetric(cx, 0, "etcd_server_is_learner 0") + expectLearnerMetric(cx, 1, "etcd_server_is_learner 1") +} + +func expectLearnerMetric(cx ctlCtx, procIdx int, expectMetric string) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + args := e2e.CURLPrefixArgsCluster(cx.epc.Cfg, cx.epc.Procs[procIdx], "GET", e2e.CURLReq{Endpoint: "/metrics"}) + if err := e2e.SpawnWithExpectsContext(ctx, args, nil, expect.ExpectedResponse{Value: expectMetric}); err != nil { + cx.t.Fatal(err) + } +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 1815a0a9d6e..87e29a705f7 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -780,7 +780,21 @@ func (epc *EtcdProcessCluster) CloseProc(ctx context.Context, finder func(EtcdPr // Phase 1 - Inform cluster of new configuration // Phase 2 - Start new member func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProcessClusterConfig, tb testing.TB, addAsLearner bool, opts ...config.ClientOption) (memberID uint64, err error) { - var serverCfg *EtcdServerProcessConfig + memberID, serverCfg, err := epc.AddMember(ctx, cfg, tb, addAsLearner, opts...) + if err != nil { + return 0, err + } + + // Then start process + if err = epc.StartNewProcFromConfig(ctx, tb, serverCfg); err != nil { + return 0, err + } + + return memberID, nil +} + +// AddMember adds a new member to the cluster without starting it. +func (epc *EtcdProcessCluster) AddMember(ctx context.Context, cfg *EtcdProcessClusterConfig, tb testing.TB, addAsLearner bool, opts ...config.ClientOption) (memberID uint64, serverCfg *EtcdServerProcessConfig, err error) { if cfg != nil { serverCfg = cfg.EtcdServerProcessConfig(tb, epc.nextSeq) } else { @@ -808,20 +822,24 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces resp, err = memberCtl.MemberAdd(ctx, serverCfg.Name, []string{serverCfg.PeerURL.String()}) } if err != nil { - return 0, fmt.Errorf("failed to add new member: %w", err) + return 0, nil, fmt.Errorf("failed to add new member: %w", err) } - // Then start process + return resp.Member.ID, serverCfg, nil +} + +// StartNewProcFromConfig starts a new member process from the given config. +func (epc *EtcdProcessCluster) StartNewProcFromConfig(ctx context.Context, tb testing.TB, serverCfg *EtcdServerProcessConfig) error { tb.Log("start new member") proc, err := NewEtcdProcess(tb, serverCfg) if err != nil { epc.Close() - return 0, fmt.Errorf("cannot configure: %v", err) + return fmt.Errorf("cannot configure: %v", err) } epc.Procs = append(epc.Procs, proc) - return resp.Member.ID, proc.Start(ctx) + return proc.Start(ctx) } // UpdateProcOptions updates the options for a specific process. If no opt is set, then the config is identical