Skip to content

Commit

Permalink
lease: process renew request via raft
Browse files Browse the repository at this point in the history
Previously, the renew request can only be processed by the leader.
If a follower receives the renew request, it just forwards the
request to the leader via a internal http channel. This isn't
accurate because the leader may change during the process.

When a leader receives the renew request, the previous implementation
follows a three stage workflow: pre-raft, raft and post-raft. It's
too complicated and error prone, and the raft is more like just a
network transport channel instead of a concensus mechanism in this
case.

So we process the renew request via raft directly, it can greatly
simplify the code.

Signed-off-by: Benjamin Wang <wachao@vmware.com>
  • Loading branch information
ahrtr committed Mar 7, 2023
1 parent 8ba1e26 commit 899a60a
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 31 deletions.
17 changes: 1 addition & 16 deletions server/etcdserver/api/v3rpc/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,26 +123,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
return err
}

// Create header before we sent out the renew request.
// This can make sure that the revision is strictly smaller or equal to
// when the keepalive happened at the local server (when the local server is the leader)
// or remote leader.
// Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded
// at rev 4.
resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
ls.hdr.fill(resp.Header)

ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
if err == lease.ErrLeaseNotFound {
err = nil
ttl = 0
}

resp, err := ls.le.LeaseRenew(stream.Context(), req)
if err != nil {
return togRPCError(err)
}

resp.TTL = ttl
err = stream.Send(resp)
if err != nil {
if isClientCtxErr(stream.Context().Err(), err) {
Expand Down
6 changes: 6 additions & 0 deletions server/etcdserver/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type applierV3 interface {

LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error)

LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error)

Expand Down Expand Up @@ -207,6 +208,11 @@ func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo
return &pb.LeaseRevokeResponse{Header: a.newHeader()}, err
}

func (a *applierV3backend) LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
ttl, err := a.lessor.Renew(lease.LeaseID(lc.ID))
return &pb.LeaseKeepAliveResponse{Header: a.newHeader(), ID: lc.ID, TTL: ttl}, err
}

func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) {
for _, c := range lc.Checkpoints {
err := a.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL)
Expand Down
4 changes: 4 additions & 0 deletions server/etcdserver/apply/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ func (a *applierV3Corrupt) LeaseGrant(_ *pb.LeaseGrantRequest) (*pb.LeaseGrantRe
func (a *applierV3Corrupt) LeaseRevoke(_ *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
return nil, errors.ErrCorrupt
}

func (a *applierV3Corrupt) LeaseRenew(lc *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
return nil, errors.ErrCorrupt
}
3 changes: 3 additions & 0 deletions server/etcdserver/apply/uber_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, s
case r.LeaseRevoke != nil:
op = "LeaseRevoke"
ar.Resp, ar.Err = a.applyV3.LeaseRevoke(r.LeaseRevoke)
case r.LeaseRenew != nil:
op = "LeaseRenew"
ar.Resp, ar.Err = a.applyV3.LeaseRenew(r.LeaseRenew)
case r.LeaseCheckpoint != nil:
op = "LeaseCheckpoint"
ar.Resp, ar.Err = a.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
Expand Down
17 changes: 12 additions & 5 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ type Lessor interface {
// LeaseRevoke sends LeaseRevoke request to raft and toApply it after committed.
LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)

// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
// is returned.
LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
// LeaseRenew renews the lease.
LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error)

// LeaseTimeToLive retrieves lease information.
LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
Expand Down Expand Up @@ -276,13 +275,13 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
return resp.(*pb.LeaseRevokeResponse), nil
}

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
func (s *EtcdServer) LeaseRenewV2(ctx context.Context, id lease.LeaseID) (int64, error) {
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
return 0, err
}

ttl, err := s.lessor.Renew(id)
ttl, err := s.lessor.RenewV2(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
Expand Down Expand Up @@ -317,6 +316,14 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
return -1, errors.ErrCanceled
}

func (s *EtcdServer) LeaseRenew(ctx context.Context, r *pb.LeaseKeepAliveRequest) (*pb.LeaseKeepAliveResponse, error) {
resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRenew: r})
if err != nil {
return nil, err
}
return resp.(*pb.LeaseKeepAliveResponse), err
}

func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
if s.isLeader() {
if err := s.waitAppliedIndex(); err != nil {
Expand Down
44 changes: 40 additions & 4 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,13 @@ type Lessor interface {
// Demote demotes the lessor from being the primary lessor.
Demote()

// Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
// an error will be returned.
// RenewV2 renews a lease with given ID. It returns the renewed TTL.
// If the ID does not exist, an error will be returned.
// TODO(ahrtr): remove this legacy method in 3.7.
RenewV2(id LeaseID) (int64, error)

// Renew renews a lease with given ID. It returns the renewed TTL.
// If the given lease does not exist, an error will be returned.
Renew(id LeaseID) (int64, error)

// Lookup gives the lease at a given lease id, if any
Expand Down Expand Up @@ -364,7 +369,10 @@ func (le *lessor) Revoke(id LeaseID) error {
func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
le.mu.Lock()
defer le.mu.Unlock()
return le.checkpoint(id, remainingTTL)
}

func (le *lessor) checkpoint(id LeaseID, remainingTTL int64) error {
if l, ok := le.leaseMap[id]; ok {
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
l.remainingTTL = remainingTTL
Expand All @@ -388,9 +396,10 @@ func greaterOrEqual(first, second semver.Version) bool {
return !version.LessThan(first, second)
}

// Renew renews an existing lease. If the given lease does not exist or
// RenewV2 renews an existing lease. If the given lease does not exist or
// has expired, an error will be returned.
func (le *lessor) Renew(id LeaseID) (int64, error) {
// TODO(ahrtr): remove the legacy method in 3.7
func (le *lessor) RenewV2(id LeaseID) (int64, error) {
le.mu.RLock()
if !le.isPrimary() {
// forward renew request to primary instead of returning error.
Expand Down Expand Up @@ -442,6 +451,31 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
return l.ttl, nil
}

func (le *lessor) Renew(id LeaseID) (int64, error) {
le.mu.Lock()
defer le.mu.Unlock()

l := le.leaseMap[id]
if l == nil {
return -1, ErrLeaseNotFound
}

if !le.isPrimary() {
if l.remainingTTL > 0 {
le.checkpoint(id, 0)
}
return l.ttl, nil
}

le.checkpoint(id, 0)
l.refresh(0)
item := &LeaseWithTime{id: l.ID, time: l.expiry}
le.leaseExpiredNotifier.RegisterOrUpdate(item)

leaseRenewed.Inc()
return l.ttl, nil
}

func (le *lessor) Lookup(id LeaseID) *Lease {
le.mu.RLock()
defer le.mu.RUnlock()
Expand Down Expand Up @@ -842,6 +876,8 @@ func (fl *FakeLessor) Promote(extend time.Duration) {}

func (fl *FakeLessor) Demote() {}

func (fl *FakeLessor) RenewV2(id LeaseID) (int64, error) { return 10, nil }

func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil }

func (fl *FakeLessor) Lookup(id LeaseID) *Lease { return nil }
Expand Down
91 changes: 86 additions & 5 deletions server/lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,8 @@ func TestLessorRecover(t *testing.T) {
}
}

func TestLessorExpire(t *testing.T) {
// TODO(ahrtr): remove this test case when the legacy `RenewV2` is removed.
func TestLessorExpireV2(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
Expand Down Expand Up @@ -455,7 +456,7 @@ func TestLessorExpire(t *testing.T) {
donec := make(chan struct{}, 1)
go func() {
// expired lease cannot be renewed
if _, err := le.Renew(l.ID); err != ErrLeaseNotFound {
if _, err := le.RenewV2(l.ID); err != ErrLeaseNotFound {
t.Errorf("unexpected renew")
}
donec <- struct{}{}
Expand All @@ -479,7 +480,49 @@ func TestLessorExpire(t *testing.T) {
}
}

func TestLessorExpireAndDemote(t *testing.T) {
func TestLessorExpire(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()

testMinTTL := int64(1)

le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()

le.Promote(1 * time.Second)
l, err := le.Grant(1, testMinTTL)
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}

select {
case el := <-le.ExpiredLeasesC():
if el[0].ID != l.ID {
t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID)
}
case <-time.After(10 * time.Second):
t.Fatalf("failed to receive expired lease")
}

if _, err := le.Renew(l.ID); err != nil {
t.Errorf("unexpected renew")
}

// expired lease can be revoked
if err := le.Revoke(l.ID); err != nil {
t.Fatalf("failed to revoke expired lease: %v", err)
}

// revoked lease can't be renewed
if _, err := le.Renew(l.ID); err != ErrLeaseNotFound {
t.Errorf("unexpected renew")
}
}

// TODO(ahrtr): remove this test case when the legacy `RenewV2` is removed.
func TestLessorExpireAndDemoteV2(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
Expand Down Expand Up @@ -508,7 +551,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
donec := make(chan struct{}, 1)
go func() {
// expired lease cannot be renewed
if _, err := le.Renew(l.ID); err != ErrNotPrimary {
if _, err := le.RenewV2(l.ID); err != ErrNotPrimary {
t.Errorf("unexpected renew: %v", err)
}
donec <- struct{}{}
Expand All @@ -520,7 +563,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
case <-time.After(50 * time.Millisecond):
}

// demote will cause the renew request to fail with ErrNotPrimary
// demote will cause the renewV2 request to fail with ErrNotPrimary
le.Demote()

select {
Expand All @@ -530,6 +573,44 @@ func TestLessorExpireAndDemote(t *testing.T) {
}
}

func TestLessorExpireAndDemote(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()

testMinTTL := int64(1)

le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()

le.Promote(1 * time.Second)
l, err := le.Grant(1, testMinTTL)
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}

select {
case el := <-le.ExpiredLeasesC():
if el[0].ID != l.ID {
t.Fatalf("expired id = %x, want %x", el[0].ID, l.ID)
}
case <-time.After(10 * time.Second):
t.Fatalf("failed to receive expired lease")
}

if _, err := le.Renew(l.ID); err != nil {
t.Errorf("unexpected renew: %v", err)
}

le.Demote()

// renew should work after demote.
if _, err := le.Renew(l.ID); err != nil {
t.Errorf("unexpected renew: %v", err)
}
}

func TestLessorMaxTTL(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ func TestV3LeaseFailover(t *testing.T) {

// send keep alive to old leader until the old leader starts
// to drop lease request.
var expectedExp time.Time
expectedExp := time.Now().Add(time.Duration(lresp.TTL) * time.Second)
for {
if err = lac.Send(lreq); err != nil {
break
Expand Down

0 comments on commit 899a60a

Please sign in to comment.