diff --git a/clientv3/integration/network_partition_test.go b/clientv3/integration/network_partition_test.go index 388eb4e0755..2e9b9965d45 100644 --- a/clientv3/integration/network_partition_test.go +++ b/clientv3/integration/network_partition_test.go @@ -303,14 +303,14 @@ func TestDropReadUnderNetworkPartition(t *testing.T) { clus.Members[leaderIndex].InjectPartition(t, clus.Members[(leaderIndex+1)%3], clus.Members[(leaderIndex+2)%3]) kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), nil) - ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) _, err = kvc.Get(ctx, "a") cancel() if err != rpctypes.ErrLeaderChanged { t.Fatalf("expected %v, got %v", rpctypes.ErrLeaderChanged, err) } - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) _, err = kvc.Get(ctx, "a") cancel() if err != nil { diff --git a/etcdserver/server.go b/etcdserver/server.go index fc070ce51a7..90473e666ce 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -213,7 +213,8 @@ type EtcdServer struct { // done is closed when all goroutines from start() complete. done chan struct{} // leaderChanged is used to notify the linearizable read loop to drop the old read requests. - leaderChanged chan struct{} + leaderChanged chan struct{} + leaderChangedMu sync.RWMutex errorc chan error id types.ID @@ -754,7 +755,7 @@ func (s *EtcdServer) start() { s.ctx, s.cancel = context.WithCancel(context.Background()) s.readwaitc = make(chan struct{}, 1) s.readNotifier = newNotifier() - s.leaderChanged = make(chan struct{}, 1) + s.leaderChanged = make(chan struct{}) if s.ClusterVersion() != nil { if lg != nil { lg.Info( @@ -942,10 +943,11 @@ func (s *EtcdServer) run() { } } if newLeader { - select { - case s.leaderChanged <- struct{}{}: - default: - } + s.leaderChangedMu.Lock() + lc := s.leaderChanged + s.leaderChanged = make(chan struct{}) + s.leaderChangedMu.Unlock() + close(lc) } // TODO: remove the nil checking // current test utility does not provide the stats @@ -1696,6 +1698,12 @@ func (s *EtcdServer) getLead() uint64 { return atomic.LoadUint64(&s.lead) } +func (s *EtcdServer) leaderChangedNotify() <-chan struct{} { + s.leaderChangedMu.RLock() + defer s.leaderChangedMu.RUnlock() + return s.leaderChanged +} + // RaftStatusGetter represents etcd server and Raft progress. type RaftStatusGetter interface { ID() types.ID diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 5bcb7fc32d0..d425b634d9a 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -634,9 +634,9 @@ func (s *EtcdServer) linearizableReadLoop() { ctxToSend := make([]byte, 8) id1 := s.reqIDGen.Next() binary.BigEndian.PutUint64(ctxToSend, id1) - + leaderChangedNotifier := s.leaderChangedNotify() select { - case <-s.leaderChanged: + case <-leaderChangedNotifier: continue case <-s.readwaitc: case <-s.stopping: @@ -694,7 +694,7 @@ func (s *EtcdServer) linearizableReadLoop() { } slowReadIndex.Inc() } - case <-s.leaderChanged: + case <-leaderChangedNotifier: timeout = true readIndexFailed.Inc() // return a retryable error. diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 8a3d93cdab7..92899b6707f 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -565,12 +565,14 @@ func TestV3LeaseFailover(t *testing.T) { md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) mctx := metadata.NewOutgoingContext(context.Background(), md) ctx, cancel := context.WithCancel(mctx) - defer cancel() lac, err := lc.LeaseKeepAlive(ctx) if err != nil { t.Fatal(err) } - defer lac.CloseSend() + defer func() { + lac.CloseSend() + cancel() + }() // send keep alive to old leader until the old leader starts // to drop lease request.