Skip to content

Commit

Permalink
Safeguard lease operations by double checking the leadership
Browse files Browse the repository at this point in the history
1. ignore old leader's leases revoking request
2. double check current member's leadership before perform lease renew request
3. etcdserve: ensure current member's leadership before performing lease checkpoint request

Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
  • Loading branch information
ahrtr committed Dec 15, 2023
1 parent f7e488d commit 67f1716
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 7 deletions.
44 changes: 43 additions & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,15 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {

if srv.Cfg.EnableLeaseCheckpoint {
// setting checkpointer enables lease checkpoint feature.
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
if !srv.ensureLeadership() {
srv.lg.Warn("Ignore the checkpoint request because current member isn't a leader",
zap.Uint64("local-member-id", uint64(srv.MemberId())))
return lease.ErrNotPrimary
}

srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
return nil
})
}

Expand Down Expand Up @@ -844,7 +851,19 @@ func (s *EtcdServer) run() {

func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
s.GoAttach(func() {
// We shouldn't revoke any leases if current member isn't a leader,
// because the operation should only be performed by the leader. When
// the leader gets blocked on the raft loop, such as writing WAL entries,
// it can't process any events or messages from raft. It may think it
// is still the leader even the leader has already changed.
// Refer to https://github.com/etcd-io/etcd/issues/15247
lg := s.Logger()
if !s.ensureLeadership() {
lg.Warn("Ignore the lease revoking request because current member isn't a leader",
zap.Uint64("local-member-id", uint64(s.MemberId())))
return
}

// Increases throughput of expired leases deletion process through parallelization
c := make(chan struct{}, maxPendingRevokes)
for _, curLease := range leases {
Expand Down Expand Up @@ -877,6 +896,29 @@ func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
})
}

// ensureLeadership checks whether current member is still the leader.
func (s *EtcdServer) ensureLeadership() bool {
lg := s.Logger()

ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
defer cancel()
if err := s.linearizableReadNotify(ctx); err != nil {
lg.Warn("Failed to check current member's leadership",
zap.Error(err))
return false
}

newLeaderId := s.raftStatus().Lead
if newLeaderId != uint64(s.MemberId()) {
lg.Warn("Current member isn't a leader",
zap.Uint64("local-member-id", uint64(s.MemberId())),
zap.Uint64("new-lead", newLeaderId))
return false
}

return true
}

// Cleanup removes allocated objects by EtcdServer.NewServer in
// situation that EtcdServer::Start was not called (that takes care of cleanup).
func (s *EtcdServer) Cleanup() {
Expand Down
10 changes: 10 additions & 0 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,16 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
if s.isLeader() {
// If s.isLeader() returns true, but we fail to ensure the current
// member's leadership, there are a couple of possibilities:
// 1. current member gets stuck on writing WAL entries;
// 2. current member is in network isolation status;
// 3. current member isn't a leader anymore (possibly due to #1 above).
// In such case, we just return error to client, so that the client can
// switch to another member to continue the lease keep-alive operation.
if !s.ensureLeadership() {
return -1, lease.ErrNotPrimary
}
if err := s.waitAppliedIndex(); err != nil {
return 0, err
}
Expand Down
10 changes: 7 additions & 3 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type RangeDeleter func() TxnDelete

// Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to
// avoid circular dependency with mvcc.
type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest)
type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error

type LeaseID int64

Expand Down Expand Up @@ -422,7 +422,9 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
// By applying a RAFT entry only when the remainingTTL is already set, we limit the number
// of RAFT entries written per lease to a max of 2 per checkpoint interval.
if clearRemainingTTL {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}); err != nil {
return -1, err
}
}

le.mu.Lock()
Expand Down Expand Up @@ -656,7 +658,9 @@ func (le *lessor) checkpointScheduledLeases() {
le.mu.Unlock()

if len(cps) != 0 {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps})
if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}); err != nil {
return
}
}
if len(cps) < maxLeaseCheckpointBatchSize {
return
Expand Down
6 changes: 4 additions & 2 deletions server/lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,11 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
defer os.RemoveAll(dir)

le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
for _, cp := range cp.GetCheckpoints() {
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
}
return nil
}
defer le.Stop()
// Set checkpointer
Expand Down Expand Up @@ -556,7 +557,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
defer le.Stop()
le.minLeaseTTL = 1
checkpointedC := make(chan struct{})
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error {
close(checkpointedC)
if len(lc.Checkpoints) != 1 {
t.Errorf("expected 1 checkpoint but got %d", len(lc.Checkpoints))
Expand All @@ -565,6 +566,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
if c.Remaining_TTL != 1 {
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
}
return nil
})
_, err := le.Grant(1, 2)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ func TestV3LeaseFailover(t *testing.T) {

// send keep alive to old leader until the old leader starts
// to drop lease request.
var expectedExp time.Time
expectedExp := time.Now().Add(5 * time.Second)
for {
if err = lac.Send(lreq); err != nil {
break
Expand Down

0 comments on commit 67f1716

Please sign in to comment.