Skip to content

Commit

Permalink
lease: do lease pile-up reduction in the background
Browse files Browse the repository at this point in the history
This moves lease pile-up reduction into a goroutine which mostly operates on a copy of
the lease list, to avoid locking. This prevents timeouts when the lessor is locked
for a long time (when there are a lot of leases, mostly). This should solve #9496.

We had a problem where when we had a lot of leases (100kish), and a
leader election happened the lessor was locked for a long time causing
timeouts when we tried to grant a lease. This changes it so that the
lessor is locked in batches, which allows for creation of leases while
the leader is initializing the lessor. It still won't start expiring
leases until it's all done rewriting them, though.

Before:
```
BenchmarkLessorPromote1-16                        500000 4036 ns/op
BenchmarkLessorPromote10-16                       500000 3932 ns/op
BenchmarkLessorPromote100-16                      500000 3954 ns/op
BenchmarkLessorPromote1000-16                     300000 3906 ns/op
BenchmarkLessorPromote10000-16                    300000 4639 ns/op
BenchmarkLessorPromote100000-16                      100 27216481 ns/op
BenchmarkLessorPromote1000000-16                     100 325164684 ns/op
```

After:
```
BenchmarkLessorPromote1-16                        500000 3769 ns/op
BenchmarkLessorPromote10-16                       500000 3835 ns/op
BenchmarkLessorPromote100-16                      500000 3829 ns/op
BenchmarkLessorPromote1000-16                     500000 3665 ns/op
BenchmarkLessorPromote10000-16                    500000 3800 ns/op
BenchmarkLessorPromote100000-16                   300000 4114 ns/op
BenchmarkLessorPromote1000000-16                  300000 5143 ns/op
```
  • Loading branch information
braintreeps authored and Micah Gates committed Jun 6, 2018
1 parent 0ad6e52 commit 591a377
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 45 deletions.
108 changes: 64 additions & 44 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ type lessor struct {
stopC chan struct{}
// doneC is a channel whose closure indicates that the lessor is stopped.
doneC chan struct{}

// when the lease pile-up reduction is done this is true
Ready bool
}

func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor {
Expand Down Expand Up @@ -329,66 +332,83 @@ func (le *lessor) unsafeLeases() []*Lease {
for _, l := range le.leaseMap {
leases = append(leases, l)
}
sort.Sort(leasesByExpiry(leases))
return leases
}

func (le *lessor) Leases() []*Lease {
le.mu.RLock()
ls := le.unsafeLeases()
le.mu.RUnlock()
sort.Sort(leasesByExpiry(ls))
return ls
}

func (le *lessor) Promote(extend time.Duration) {
le.mu.Lock()
defer le.mu.Unlock()
le.Ready = false
le.mu.Unlock()
go func() {
le.mu.Lock()
le.demotec = make(chan struct{})
leaseCopy := le.unsafeLeases()
le.mu.Unlock()
var updateList []*LeaseWithTime
defer func() {
le.mu.Lock()
defer le.mu.Unlock()
for _, item := range updateList {
heap.Push(&le.leaseHeap, item)
}
le.Ready = true
}()

// refresh the expiries of all leases.
for _, l := range leaseCopy {
l.refresh(extend)
// check that the lease hasn't been revoked
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
updateList = append(updateList, item)
}

le.demotec = make(chan struct{})
if len(le.leaseMap) < leaseRevokeRate {
// no possibility of lease pile-up
return
}

// refresh the expiries of all leases.
for _, l := range le.leaseMap {
l.refresh(extend)
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
}
// adjust expiries in case of overlap

if len(le.leaseMap) < leaseRevokeRate {
// no possibility of lease pile-up
return
}
sort.Sort(leasesByExpiry(leaseCopy))

// adjust expiries in case of overlap
leases := le.unsafeLeases()

baseWindow := leases[0].Remaining()
nextWindow := baseWindow + time.Second
expires := 0
// have fewer expires than the total revoke rate so piled up leases
// don't consume the entire revoke limit
targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
for _, l := range leases {
remaining := l.Remaining()
if remaining > nextWindow {
baseWindow = remaining
nextWindow = baseWindow + time.Second
expires = 1
continue
}
expires++
if expires <= targetExpiresPerSecond {
continue
baseWindow := leaseCopy[0].Remaining()
nextWindow := baseWindow + time.Second
expires := 0
// have fewer expires than the total revoke rate so piled up leases
// don't consume the entire revoke limit
targetExpiresPerSecond := (3 * leaseRevokeRate) / 4

for _, l := range leaseCopy {
remaining := l.Remaining()
if remaining > nextWindow {
baseWindow = remaining
nextWindow = baseWindow + time.Second
expires = 1
continue
}
expires++
if expires <= targetExpiresPerSecond {
continue
}
rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
// If leases are extended by n seconds, leases n seconds ahead of the
// base window should be extended by only one second.
rateDelay -= float64(remaining - baseWindow)
delay := time.Duration(rateDelay)
nextWindow = baseWindow + delay
l.refresh(delay + extend)
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
updateList = append(updateList, item)
}
rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
// If leases are extended by n seconds, leases n seconds ahead of the
// base window should be extended by only one second.
rateDelay -= float64(remaining - baseWindow)
delay := time.Duration(rateDelay)
nextWindow = baseWindow + delay
l.refresh(delay + extend)
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
}
}()
}

type leasesByExpiry []*Lease
Expand Down Expand Up @@ -490,7 +510,7 @@ func (le *lessor) runLoop() {
revokeLimit := leaseRevokeRate / 2

le.mu.RLock()
if le.isPrimary() {
if le.isPrimary() && le.Ready {
ls = le.findExpiredLeases(revokeLimit)
}
le.mu.RUnlock()
Expand Down
26 changes: 26 additions & 0 deletions lease/lessor_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package lease
import (
"os"
"testing"
"time"

"github.com/coreos/etcd/mvcc/backend"
)
Expand Down Expand Up @@ -53,6 +54,14 @@ func BenchmarkLessorRevoke10000(b *testing.B) { benchmarkLessorRevoke(10000, b
func BenchmarkLessorRevoke100000(b *testing.B) { benchmarkLessorRevoke(100000, b) }
func BenchmarkLessorRevoke1000000(b *testing.B) { benchmarkLessorRevoke(1000000, b) }

func BenchmarkLessorPromote1(b *testing.B) { benchmarkLessorPromote(1, b) }
func BenchmarkLessorPromote10(b *testing.B) { benchmarkLessorPromote(10, b) }
func BenchmarkLessorPromote100(b *testing.B) { benchmarkLessorPromote(100, b) }
func BenchmarkLessorPromote1000(b *testing.B) { benchmarkLessorPromote(1000, b) }
func BenchmarkLessorPromote10000(b *testing.B) { benchmarkLessorPromote(10000, b) }
func BenchmarkLessorPromote100000(b *testing.B) { benchmarkLessorPromote(100000, b) }
func BenchmarkLessorPromote1000000(b *testing.B) { benchmarkLessorPromote(1000000, b) }

func benchmarkLessorFindExpired(size int, b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
Expand Down Expand Up @@ -115,6 +124,23 @@ func benchmarkLessorRenew(size int, b *testing.B) {
}
}

func benchmarkLessorPromote(size int, b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {
le.Grant(LeaseID(i), int64(100+i))
}

b.ResetTimer()

go func() { le.Promote(100 * time.Second) }()
for i := 0; i < b.N; i++ {
le.Grant(LeaseID(i+size), int64(100+i+size))
}
}

func cleanup(b backend.Backend, path string) {
b.Close()
os.Remove(path)
Expand Down
19 changes: 18 additions & 1 deletion lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"path/filepath"
"reflect"
"runtime"
"sort"
"sync"
"testing"
Expand All @@ -44,6 +45,7 @@ func TestLessorGrant(t *testing.T) {
le := newLessor(be, minLeaseTTL)
defer le.Stop()
le.Promote(0)
waitForPromotion(le)

l, err := le.Grant(1, 1)
if err != nil {
Expand Down Expand Up @@ -205,7 +207,7 @@ func TestLessorRenew(t *testing.T) {
le := newLessor(be, minLeaseTTL)
defer le.Stop()
le.Promote(0)

waitForPromotion(le)
l, err := le.Grant(1, minLeaseTTL)
if err != nil {
t.Fatalf("failed to grant lease (%v)", err)
Expand Down Expand Up @@ -263,6 +265,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {

// extend after recovery should extend expiration on lease pile-up
le.Promote(0)
waitForPromotion(le)

windowCounts := make(map[int64]int)
for _, l := range le.leaseMap {
Expand Down Expand Up @@ -360,6 +363,7 @@ func TestLessorExpire(t *testing.T) {
defer le.Stop()

le.Promote(1 * time.Second)
waitForPromotion(le)
l, err := le.Grant(1, testMinTTL)
if err != nil {
t.Fatalf("failed to create lease: %v", err)
Expand Down Expand Up @@ -412,6 +416,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
defer le.Stop()

le.Promote(1 * time.Second)
waitForPromotion(le)
l, err := le.Grant(1, testMinTTL)
if err != nil {
t.Fatalf("failed to create lease: %v", err)
Expand Down Expand Up @@ -492,3 +497,15 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
bcfg.Path = filepath.Join(tmpPath, "be")
return tmpPath, backend.New(bcfg)
}
func waitForPromotion(le *lessor) {
for {
le.mu.RLock()
ready := le.Ready
le.mu.RUnlock()
if ready {
return
} else {
runtime.Gosched()
}
}
}

0 comments on commit 591a377

Please sign in to comment.