Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.4] Checkpoints fix 3.4 #14253

Merged
merged 4 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (s *v3Manager) saveDB() error {
be := backend.NewDefaultBackend(dbpath)

// a lessor never timeouts leases
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
lessor := lease.NewLessor(s.lg, be, nil, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})

mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
txn := mvs.Write(traceutil.TODO())
Expand Down
21 changes: 17 additions & 4 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,15 @@ type Config struct {
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
// ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses (array and map are supported types).
ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"`
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
// ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
// Requires experimental-enable-lease-checkpoint to be enabled.
// Deprecated in v3.6.
// TODO: Delete in v3.7
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
// takes more time than this value.
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`
Expand Down Expand Up @@ -637,6 +642,14 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode)
}

if !cfg.ExperimentalEnableLeaseCheckpointPersist && cfg.ExperimentalEnableLeaseCheckpoint {
cfg.logger.Warn("Detected that checkpointing is enabled without persistence. Consider enabling experimental-enable-lease-checkpoint-persist")
}

if cfg.ExperimentalEnableLeaseCheckpointPersist && !cfg.ExperimentalEnableLeaseCheckpoint {
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
EnableGRPCGateway: cfg.EnableGRPCGateway,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
Expand Down
4 changes: 3 additions & 1 deletion etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")
fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)")
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
// TODO: delete in v3.7
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
Expand Down
4 changes: 3 additions & 1 deletion etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,12 @@ type ServerConfig struct {

ForceNewCluster bool

// EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
// EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
EnableLeaseCheckpoint bool
// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
LeaseCheckpointInterval time.Duration
// LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
LeaseCheckpointPersist bool

EnableGRPCGateway bool

Expand Down
2 changes: 2 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,11 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
srv.lessor = lease.NewLessor(
srv.getLogger(),
srv.be,
srv.cluster,
lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
CheckpointPersist: cfg.LeaseCheckpointPersist,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})

Expand Down
4 changes: 4 additions & 0 deletions integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type ClusterConfig struct {

EnableLeaseCheckpoint bool
LeaseCheckpointInterval time.Duration
LeaseCheckpointPersist bool

WatchProgressNotifyInterval time.Duration
CorruptCheckTime time.Duration
Expand Down Expand Up @@ -298,6 +299,7 @@ func (c *cluster) mustNewMember(t testing.TB) *member {
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
useIP: c.cfg.UseIP,
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
CorruptCheckTime: c.cfg.CorruptCheckTime,
Expand Down Expand Up @@ -590,6 +592,7 @@ type memberConfig struct {
useIP bool
enableLeaseCheckpoint bool
leaseCheckpointInterval time.Duration
leaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration
CorruptCheckTime time.Duration
}
Expand Down Expand Up @@ -684,6 +687,7 @@ func mustNewMember(t testing.TB, mcfg memberConfig) *member {
m.useIP = mcfg.useIP
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
m.LeaseCheckpointPersist = mcfg.leaseCheckpointPersist

m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval

Expand Down
149 changes: 108 additions & 41 deletions integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,54 +313,121 @@ func TestV3LeaseKeepAlive(t *testing.T) {
// TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted
// across leader elections.
func TestV3LeaseCheckpoint(t *testing.T) {
var ttl int64 = 300
leaseInterval := 2 * time.Second
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{
Size: 3,
EnableLeaseCheckpoint: true,
LeaseCheckpointInterval: leaseInterval,
})
defer clus.Terminate(t)

// create lease
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := toGRPC(clus.RandClient())
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl})
if err != nil {
t.Fatal(err)
tcs := []struct {
name string
checkpointingEnabled bool
ttl time.Duration
checkpointingInterval time.Duration
checkpointingPersist bool
leaderChanges int
clusterSize int
expectTTLIsGT time.Duration
expectTTLIsLT time.Duration
}{
{
name: "Checkpointing disabled, lease TTL is reset",
ttl: 300 * time.Second,
leaderChanges: 1,
clusterSize: 3,
expectTTLIsGT: 298 * time.Second,
},
{
name: "Checkpointing enabled 10s, lease TTL is preserved after leader change",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
leaderChanges: 1,
clusterSize: 3,
expectTTLIsLT: 290 * time.Second,
},
{
name: "Checkpointing enabled 10s with persist, lease TTL is preserved after cluster restart",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
checkpointingPersist: true,
leaderChanges: 1,
clusterSize: 1,
expectTTLIsLT: 290 * time.Second,
},
{
name: "Checkpointing enabled 10s, lease TTL is reset after restart",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
leaderChanges: 1,
clusterSize: 1,
expectTTLIsGT: 298 * time.Second,
},
{
// Checking if checkpointing continues after the first leader change.
name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
leaderChanges: 2,
clusterSize: 3,
expectTTLIsLT: 280 * time.Second,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
config := &ClusterConfig{
Size: tc.clusterSize,
EnableLeaseCheckpoint: tc.checkpointingEnabled,
LeaseCheckpointInterval: tc.checkpointingInterval,
LeaseCheckpointPersist: tc.checkpointingPersist,
SnapshotCount: 5,
}
clus := NewClusterV3(t, config)
defer clus.Terminate(t)

// create lease
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := toGRPC(clus.RandClient())
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: int64(tc.ttl.Seconds())})
if err != nil {
t.Fatal(err)
}

// wait for a checkpoint to occur
time.Sleep(leaseInterval + 1*time.Second)
for i := 0; i < tc.leaderChanges; i++ {
// wait for a checkpoint to occur
time.Sleep(tc.checkpointingInterval + 1*time.Second)

// Force a leader election
leaderId := clus.WaitLeader(t)
leader := clus.Members[leaderId]
leader.Stop(t)
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
leader.Restart(t)
newLeaderId := clus.WaitLeader(t)
c2 := toGRPC(clus.Client(newLeaderId))
// Force a leader election
leaderId := clus.WaitLeader(t)
leader := clus.Members[leaderId]
leader.Stop(t)
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
leader.Restart(t)
}

time.Sleep(250 * time.Millisecond)
newLeaderId := clus.WaitLeader(t)
c2 := toGRPC(clus.Client(newLeaderId))

// Check the TTL of the new leader
var ttlresp *pb.LeaseTimeToLiveResponse
for i := 0; i < 10; i++ {
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
time.Sleep(time.Millisecond * 250)
} else {
t.Fatal(err)
time.Sleep(250 * time.Millisecond)

// Check the TTL of the new leader
var ttlresp *pb.LeaseTimeToLiveResponse
for i := 0; i < 10; i++ {
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
time.Sleep(time.Millisecond * 250)
} else {
t.Fatal(err)
}
}
}

if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second <= tc.expectTTLIsGT {
t.Errorf("Expected lease ttl (%v) to be greather than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT)
}
}
}

expectedTTL := ttl - int64(leaseInterval.Seconds())
if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL {
t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL)
if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT {
t.Errorf("Expected lease ttl (%v) to be lower than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsLT)
}
})
}
}

Expand Down
6 changes: 3 additions & 3 deletions lease/leasehttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestRenewHTTP(t *testing.T) {
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand All @@ -58,7 +58,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand Down Expand Up @@ -100,7 +100,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand Down
Loading