diff --git a/clientv3/snapshot/v3_snapshot.go b/clientv3/snapshot/v3_snapshot.go index b8797618875..25be6fb4aa3 100644 --- a/clientv3/snapshot/v3_snapshot.go +++ b/clientv3/snapshot/v3_snapshot.go @@ -391,7 +391,7 @@ func (s *v3Manager) saveDB() error { be := backend.NewDefaultBackend(dbpath) // a lessor never timeouts leases - lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}) + lessor := lease.NewLessor(s.lg, be, nil, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}) mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) txn := mvs.Write(traceutil.TODO()) diff --git a/embed/config.go b/embed/config.go index 70a311c6cb1..247a4c1bd33 100644 --- a/embed/config.go +++ b/embed/config.go @@ -288,10 +288,15 @@ type Config struct { ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"` // ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses (array and map are supported types). ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"` - // ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. - ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` - ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` - ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` + // ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change. + ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` + // ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. + // Requires experimental-enable-lease-checkpoint to be enabled. + // Deprecated in v3.6. + // TODO: Delete in v3.7 + ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"` + ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` + ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` // ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request // takes more time than this value. ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"` @@ -637,6 +642,14 @@ func (cfg *Config) Validate() error { return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode) } + if !cfg.ExperimentalEnableLeaseCheckpointPersist && cfg.ExperimentalEnableLeaseCheckpoint { + cfg.logger.Warn("Detected that checkpointing is enabled without persistence. Consider enabling experimental-enable-lease-checkpoint-persist") + } + + if cfg.ExperimentalEnableLeaseCheckpointPersist && !cfg.ExperimentalEnableLeaseCheckpoint { + return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint") + } + return nil } diff --git a/embed/etcd.go b/embed/etcd.go index 477f02e0603..5af6eee4fe8 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -208,6 +208,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { EnableGRPCGateway: cfg.EnableGRPCGateway, UnsafeNoFsync: cfg.UnsafeNoFsync, EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, + LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist, CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, WarningApplyDuration: cfg.ExperimentalWarningApplyDuration, diff --git a/etcdmain/config.go b/etcdmain/config.go index 751257c349b..209ec89663d 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -257,7 +257,9 @@ func newConfig() *config { fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.") fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.") fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)") - fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.") + fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.") + // TODO: delete in v3.7 + fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.") fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.") fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.") fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.") diff --git a/etcdserver/config.go b/etcdserver/config.go index 70208d2f223..e3084864dc0 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -158,10 +158,12 @@ type ServerConfig struct { ForceNewCluster bool - // EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. + // EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change. EnableLeaseCheckpoint bool // LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints. LeaseCheckpointInterval time.Duration + // LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. + LeaseCheckpointPersist bool EnableGRPCGateway bool diff --git a/etcdserver/server.go b/etcdserver/server.go index 1638719ffc6..d32a37e0a3a 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -555,9 +555,11 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { srv.lessor = lease.NewLessor( srv.getLogger(), srv.be, + srv.cluster, lease.LessorConfig{ MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval, + CheckpointPersist: cfg.LeaseCheckpointPersist, ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), }) diff --git a/integration/cluster.go b/integration/cluster.go index 121414e1437..11d44c96541 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -152,6 +152,7 @@ type ClusterConfig struct { EnableLeaseCheckpoint bool LeaseCheckpointInterval time.Duration + LeaseCheckpointPersist bool WatchProgressNotifyInterval time.Duration CorruptCheckTime time.Duration @@ -298,6 +299,7 @@ func (c *cluster) mustNewMember(t testing.TB) *member { clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, useIP: c.cfg.UseIP, enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, + leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist, leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval, CorruptCheckTime: c.cfg.CorruptCheckTime, @@ -590,6 +592,7 @@ type memberConfig struct { useIP bool enableLeaseCheckpoint bool leaseCheckpointInterval time.Duration + leaseCheckpointPersist bool WatchProgressNotifyInterval time.Duration CorruptCheckTime time.Duration } @@ -684,6 +687,7 @@ func mustNewMember(t testing.TB, mcfg memberConfig) *member { m.useIP = mcfg.useIP m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval + m.LeaseCheckpointPersist = mcfg.leaseCheckpointPersist m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 26fa25a9482..a157f57b68e 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -313,54 +313,121 @@ func TestV3LeaseKeepAlive(t *testing.T) { // TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted // across leader elections. func TestV3LeaseCheckpoint(t *testing.T) { - var ttl int64 = 300 - leaseInterval := 2 * time.Second - defer testutil.AfterTest(t) - clus := NewClusterV3(t, &ClusterConfig{ - Size: 3, - EnableLeaseCheckpoint: true, - LeaseCheckpointInterval: leaseInterval, - }) - defer clus.Terminate(t) - - // create lease - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - c := toGRPC(clus.RandClient()) - lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl}) - if err != nil { - t.Fatal(err) + tcs := []struct { + name string + checkpointingEnabled bool + ttl time.Duration + checkpointingInterval time.Duration + checkpointingPersist bool + leaderChanges int + clusterSize int + expectTTLIsGT time.Duration + expectTTLIsLT time.Duration + }{ + { + name: "Checkpointing disabled, lease TTL is reset", + ttl: 300 * time.Second, + leaderChanges: 1, + clusterSize: 3, + expectTTLIsGT: 298 * time.Second, + }, + { + name: "Checkpointing enabled 10s, lease TTL is preserved after leader change", + ttl: 300 * time.Second, + checkpointingEnabled: true, + checkpointingInterval: 10 * time.Second, + leaderChanges: 1, + clusterSize: 3, + expectTTLIsLT: 290 * time.Second, + }, + { + name: "Checkpointing enabled 10s with persist, lease TTL is preserved after cluster restart", + ttl: 300 * time.Second, + checkpointingEnabled: true, + checkpointingInterval: 10 * time.Second, + checkpointingPersist: true, + leaderChanges: 1, + clusterSize: 1, + expectTTLIsLT: 290 * time.Second, + }, + { + name: "Checkpointing enabled 10s, lease TTL is reset after restart", + ttl: 300 * time.Second, + checkpointingEnabled: true, + checkpointingInterval: 10 * time.Second, + leaderChanges: 1, + clusterSize: 1, + expectTTLIsGT: 298 * time.Second, + }, + { + // Checking if checkpointing continues after the first leader change. + name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes", + ttl: 300 * time.Second, + checkpointingEnabled: true, + checkpointingInterval: 10 * time.Second, + leaderChanges: 2, + clusterSize: 3, + expectTTLIsLT: 280 * time.Second, + }, } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + config := &ClusterConfig{ + Size: tc.clusterSize, + EnableLeaseCheckpoint: tc.checkpointingEnabled, + LeaseCheckpointInterval: tc.checkpointingInterval, + LeaseCheckpointPersist: tc.checkpointingPersist, + SnapshotCount: 5, + } + clus := NewClusterV3(t, config) + defer clus.Terminate(t) + + // create lease + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := toGRPC(clus.RandClient()) + lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: int64(tc.ttl.Seconds())}) + if err != nil { + t.Fatal(err) + } - // wait for a checkpoint to occur - time.Sleep(leaseInterval + 1*time.Second) + for i := 0; i < tc.leaderChanges; i++ { + // wait for a checkpoint to occur + time.Sleep(tc.checkpointingInterval + 1*time.Second) - // Force a leader election - leaderId := clus.WaitLeader(t) - leader := clus.Members[leaderId] - leader.Stop(t) - time.Sleep(time.Duration(3*electionTicks) * tickDuration) - leader.Restart(t) - newLeaderId := clus.WaitLeader(t) - c2 := toGRPC(clus.Client(newLeaderId)) + // Force a leader election + leaderId := clus.WaitLeader(t) + leader := clus.Members[leaderId] + leader.Stop(t) + time.Sleep(time.Duration(3*electionTicks) * tickDuration) + leader.Restart(t) + } - time.Sleep(250 * time.Millisecond) + newLeaderId := clus.WaitLeader(t) + c2 := toGRPC(clus.Client(newLeaderId)) - // Check the TTL of the new leader - var ttlresp *pb.LeaseTimeToLiveResponse - for i := 0; i < 10; i++ { - if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil { - if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable { - time.Sleep(time.Millisecond * 250) - } else { - t.Fatal(err) + time.Sleep(250 * time.Millisecond) + + // Check the TTL of the new leader + var ttlresp *pb.LeaseTimeToLiveResponse + for i := 0; i < 10; i++ { + if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil { + if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable { + time.Sleep(time.Millisecond * 250) + } else { + t.Fatal(err) + } + } + } + + if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second <= tc.expectTTLIsGT { + t.Errorf("Expected lease ttl (%v) to be greather than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT) } - } - } - expectedTTL := ttl - int64(leaseInterval.Seconds()) - if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL { - t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL) + if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT { + t.Errorf("Expected lease ttl (%v) to be lower than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsLT) + } + }) } } diff --git a/lease/leasehttp/http_test.go b/lease/leasehttp/http_test.go index 0802515d510..dc9ee0c8ef8 100644 --- a/lease/leasehttp/http_test.go +++ b/lease/leasehttp/http_test.go @@ -33,7 +33,7 @@ func TestRenewHTTP(t *testing.T) { defer os.Remove(tmpPath) defer be.Close() - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -58,7 +58,7 @@ func TestTimeToLiveHTTP(t *testing.T) { defer os.Remove(tmpPath) defer be.Close() - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -100,7 +100,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) { defer os.Remove(tmpPath) defer be.Close() - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { diff --git a/lease/lessor.go b/lease/lessor.go index ffbd15e1422..9ad8f137d53 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/coreos/go-semver/semver" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/lease/leasepb" "go.etcd.io/etcd/mvcc/backend" @@ -37,6 +38,8 @@ const NoLease = LeaseID(0) // MaxLeaseTTL is the maximum lease TTL value const MaxLeaseTTL = 9000000000 +var v3_6 = semver.Version{Major: 3, Minor: 6} + var ( forever = time.Time{} @@ -182,19 +185,29 @@ type lessor struct { checkpointInterval time.Duration // the interval to check if the expired lease is revoked expiredLeaseRetryInterval time.Duration + // whether lessor should always persist remaining TTL (always enabled in v3.6). + checkpointPersist bool + // cluster is used to adapt lessor logic based on cluster version + cluster cluster +} + +type cluster interface { + // Version is the cluster-wide minimum major.minor version. + Version() *semver.Version } type LessorConfig struct { MinLeaseTTL int64 CheckpointInterval time.Duration ExpiredLeasesRetryInterval time.Duration + CheckpointPersist bool } -func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { - return newLessor(lg, b, cfg) +func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor { + return newLessor(lg, b, cluster, cfg) } -func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { +func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor { checkpointInterval := cfg.CheckpointInterval expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval if checkpointInterval == 0 { @@ -212,11 +225,13 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { minLeaseTTL: cfg.MinLeaseTTL, checkpointInterval: checkpointInterval, expiredLeaseRetryInterval: expiredLeaseRetryInterval, + checkpointPersist: cfg.CheckpointPersist, // expiredC is a small buffered chan to avoid unnecessary blocking. expiredC: make(chan []*Lease, 16), stopC: make(chan struct{}), doneC: make(chan struct{}), lg: lg, + cluster: cluster, } l.initAndRecover() @@ -353,6 +368,9 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error { if l, ok := le.leaseMap[id]; ok { // when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry l.remainingTTL = remainingTTL + if le.shouldPersistCheckpoints() { + l.persistTo(le.b) + } if le.isPrimary() { // schedule the next checkpoint as needed le.scheduleCheckpointIfNeeded(l) @@ -361,6 +379,15 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error { return nil } +func (le *lessor) shouldPersistCheckpoints() bool { + cv := le.cluster.Version() + return le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6)) +} + +func greaterOrEqual(first, second semver.Version) bool { + return !first.LessThan(second) +} + // Renew renews an existing lease. If the given lease does not exist or // has expired, an error will be returned. func (le *lessor) Renew(id LeaseID) (int64, error) { @@ -448,6 +475,7 @@ func (le *lessor) Promote(extend time.Duration) { l.refresh(extend) item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()} le.leaseExpiredNotifier.RegisterOrUpdate(item) + le.scheduleCheckpointIfNeeded(l) } if len(le.leaseMap) < leaseRevokeRate { @@ -785,9 +813,10 @@ func (le *lessor) initAndRecover() { ttl: lpb.TTL, // itemSet will be filled in when recover key-value pairs // set expiry to forever, refresh when promoted - itemSet: make(map[LeaseItem]struct{}), - expiry: forever, - revokec: make(chan struct{}), + itemSet: make(map[LeaseItem]struct{}), + expiry: forever, + revokec: make(chan struct{}), + remainingTTL: lpb.RemainingTTL, } } le.leaseExpiredNotifier.Init() diff --git a/lease/lessor_bench_test.go b/lease/lessor_bench_test.go index cf51aee70ae..da115c09388 100644 --- a/lease/lessor_bench_test.go +++ b/lease/lessor_bench_test.go @@ -69,7 +69,7 @@ func setUp() (le *lessor, tearDown func()) { be, tmpPath := backend.NewDefaultTmpBackend() // MinLeaseTTL is negative, so we can grant expired lease in benchmark. // ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease. - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}) + le = newLessor(lg, be, nil, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}) le.SetRangeDeleter(func() TxnDelete { ftd := &FakeTxnDelete{be.BatchTx()} ftd.Lock() diff --git a/lease/lessor_test.go b/lease/lessor_test.go index bf8b58460cb..264a1aa1819 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -27,10 +27,12 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" + "github.com/coreos/go-semver/semver" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/lease/leasepb" "go.etcd.io/etcd/mvcc/backend" + "go.etcd.io/etcd/pkg/testutil" + "go.etcd.io/etcd/version" "go.uber.org/zap" ) @@ -48,7 +50,7 @@ func TestLessorGrant(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.Promote(0) @@ -110,7 +112,7 @@ func TestLeaseConcurrentKeys(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -159,7 +161,7 @@ func TestLessorRevoke(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() var fd *fakeDeleter le.SetRangeDeleter(func() TxnDelete { @@ -212,7 +214,7 @@ func TestLessorRenew(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.Promote(0) @@ -245,7 +247,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { for _, cp := range cp.GetCheckpoints() { le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) @@ -294,7 +296,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { dir, be := NewTestBackend(t) defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) ttl := int64(10) for i := 1; i <= leaseRevokeRate*10; i++ { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { @@ -313,7 +315,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { bcfg.Path = filepath.Join(dir, "be") be = backend.New(bcfg) defer be.Close() - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() // extend after recovery should extend expiration on lease pile-up @@ -343,7 +345,7 @@ func TestLessorDetach(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -384,7 +386,7 @@ func TestLessorRecover(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() l1, err1 := le.Grant(1, 10) l2, err2 := le.Grant(2, 20) @@ -393,7 +395,7 @@ func TestLessorRecover(t *testing.T) { } // Create a new lessor with the same backend - nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + nle := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer nle.Stop() nl1 := nle.Lookup(l1.ID) if nl1 == nil || nl1.ttl != l1.ttl { @@ -414,7 +416,7 @@ func TestLessorExpire(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL}) defer le.Stop() le.Promote(1 * time.Second) @@ -467,7 +469,7 @@ func TestLessorExpireAndDemote(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL}) defer le.Stop() le.Promote(1 * time.Second) @@ -516,7 +518,7 @@ func TestLessorMaxTTL(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() _, err := le.Grant(1, MaxLeaseTTL+1) @@ -532,7 +534,8 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) + defer le.Stop() le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { @@ -545,13 +548,11 @@ func TestLessorCheckpointScheduling(t *testing.T) { t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL) } }) - defer le.Stop() - le.Promote(0) - _, err := le.Grant(1, 2) if err != nil { t.Fatal(err) } + le.Promote(0) // TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds. select { @@ -567,7 +568,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() l, err := le.Grant(1, 10) if err != nil { @@ -653,7 +654,76 @@ func TestLeaseBackend(t *testing.T) { defer be2.Close() leases := unsafeGetAllLeases(be2.ReadTx()) - assert.Equal(t, tc.want, leases) + testutil.AssertEqual(t, tc.want, leases) + }) + } +} + +func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) { + const ttl int64 = 10 + const checkpointTTL int64 = 5 + + tcs := []struct { + name string + cluster cluster + checkpointPersist bool + expectRemainingTTL int64 + }{ + { + name: "Etcd v3.6 and newer persist remainingTTL on checkpoint", + cluster: clusterV3_6(), + expectRemainingTTL: checkpointTTL, + }, + { + name: "Etcd v3.5 and older persist remainingTTL if CheckpointPersist is set", + cluster: clusterLatest(), + checkpointPersist: true, + expectRemainingTTL: checkpointTTL, + }, + { + name: "Etcd with version unknown persists remainingTTL if CheckpointPersist is set", + cluster: clusterNil(), + checkpointPersist: true, + expectRemainingTTL: checkpointTTL, + }, + { + name: "Etcd v3.5 and older reset remainingTTL on checkpoint", + cluster: clusterLatest(), + expectRemainingTTL: ttl, + }, + { + name: "Etcd with version unknown fallbacks to v3.5 behavior", + cluster: clusterNil(), + expectRemainingTTL: ttl, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + lg := zap.NewNop() + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + cfg := LessorConfig{MinLeaseTTL: minLeaseTTL} + cfg.CheckpointPersist = tc.checkpointPersist + le := newLessor(lg, be, tc.cluster, cfg) + l, err := le.Grant(2, ttl) + if err != nil { + t.Fatal(err) + } + if l.RemainingTTL() != ttl { + t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), ttl) + } + le.Checkpoint(2, checkpointTTL) + if l.RemainingTTL() != checkpointTTL { + t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), checkpointTTL) + } + le.Stop() + le2 := newLessor(lg, be, clusterV3_6(), cfg) + l = le2.Lookup(2) + if l.RemainingTTL() != tc.expectRemainingTTL { + t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), tc.expectRemainingTTL) + } }) } } @@ -695,3 +765,23 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) { bcfg.Path = filepath.Join(tmpPath, "be") return tmpPath, backend.New(bcfg) } + +func clusterV3_6() cluster { + return fakeCluster{semver.New("3.6.0")} +} + +func clusterLatest() cluster { + return fakeCluster{semver.New(version.Cluster(version.Version) + ".0")} +} + +func clusterNil() cluster { + return fakeCluster{} +} + +type fakeCluster struct { + version *semver.Version +} + +func (c fakeCluster) Version() *semver.Version { + return c.version +}