Skip to content

Commit e37c792

Browse files
committed
Squashed 'src/github.com/tsenart/tb/' changes from 1fda845..9fbec38
9fbec38 Travis: download tests deps aef8c07 Updated Travis config 0c24aae Move back to Travis cf767b3 Fix doc typo c86e63e Implement Wait method on Bucket f38b46e TestBucket_Take_Throughput: GOMAXPROCS(2) git-subtree-dir: src/github.com/tsenart/tb git-subtree-split: 9fbec382fc274088cc45d8d3c0731d7da5d7e46c
1 parent c8229ff commit e37c792

File tree

6 files changed

+114
-45
lines changed

6 files changed

+114
-45
lines changed

.travis.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
language: go
2+
3+
go:
4+
- 1.2
5+
- 1.3
6+
- tip
7+
8+
install:
9+
- go get -v code.google.com/p/go.tools/cmd/vet
10+
- go get -v github.com/golang/lint/golint
11+
- go get -d -t -v ./...
12+
- go build -v ./...
13+
14+
script:
15+
- go vet ./...
16+
- $HOME/gopath/bin/golint .
17+
- go test -v -parallel=8 ./...

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Token Bucket (tb) [![Build Status](https://drone.io/github.com/tsenart/tb/status.png)](https://drone.io/github.com/tsenart/tb/latest) [![GoDoc](https://godoc.org/github.com/tsenart/tb?status.png)](https://godoc.org/github.com/tsenart/tb)
1+
# Token Bucket (tb) [![Build Status](https://secure.travis-ci.org/tsenart/tb.png)](http://travis-ci.org/tsenart/tb) [![GoDoc](https://godoc.org/github.com/tsenart/tb?status.png)](https://godoc.org/github.com/tsenart/tb)
22

33
This package provides a generic lock-free implementation of the "Token bucket"
44
algorithm where handling of non-conformity is left to the user.

bucket.go

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,29 @@ type Bucket struct {
1111
inc int64
1212
tokens int64
1313
capacity int64
14+
freq time.Duration
1415
closing chan struct{}
1516
}
1617

1718
// NewBucket returns a full Bucket with c capacity and starts a filling
1819
// go-routine which ticks every freq. The number of tokens added on each tick
1920
// is computed dynamically to be even across the duration of a second.
2021
//
21-
// If freq == -1 then it will be adjusted to 1/c seconds. Otherwise,
22-
// If freq < 1/c seconds, the filling go-routine won't be started.
22+
// If freq == -1 then the filling go-routine won't be started. Otherwise,
23+
// If freq < 1/c seconds, then it will be adjusted to 1/c seconds.
2324
func NewBucket(c int64, freq time.Duration) *Bucket {
2425
b := &Bucket{tokens: c, capacity: c, closing: make(chan struct{})}
2526

2627
if freq == -1 {
27-
freq = time.Duration(1e9 / c)
28-
} else if freq.Seconds() < 1/float64(c) {
2928
return b
29+
} else if evenFreq := time.Duration(1e9 / c); freq < evenFreq {
30+
freq = evenFreq
3031
}
3132

32-
go b.fill(freq)
33+
b.freq = freq
34+
b.inc = int64(math.Floor(.5 + (float64(c) * freq.Seconds())))
35+
36+
go b.fill()
3337

3438
return b
3539
}
@@ -76,17 +80,43 @@ func (b *Bucket) Put(n int64) (added int64) {
7680
}
7781
}
7882

83+
// Wait waits for n amount of tokens to be available.
84+
// If n tokens are immediatelly available it doesn't sleep.
85+
// Otherwise, it sleeps the minimum amount of time required for the remaining
86+
// tokens to be available. It returns the wait duration.
87+
//
88+
// This method is thread-safe.
89+
func (b *Bucket) Wait(n int64) time.Duration {
90+
var rem int64
91+
if rem = n - b.Take(n); rem == 0 {
92+
return 0
93+
}
94+
95+
var wait time.Duration
96+
for rem > 0 {
97+
wait += b.wait(rem)
98+
time.Sleep(wait)
99+
rem -= b.Take(rem)
100+
}
101+
return wait
102+
}
103+
79104
// Close stops the filling go-routine given it was started.
80105
func (b *Bucket) Close() error {
81106
close(b.closing)
82107
return nil
83108
}
84109

85-
func (b *Bucket) fill(freq time.Duration) {
86-
ticker := time.NewTicker(freq)
87-
defer ticker.Stop()
110+
// wait returns the minimum amount of time required for n tokens to be available.
111+
// if n > capacity, n will be adjusted to capacity
112+
func (b *Bucket) wait(n int64) time.Duration {
113+
return time.Duration(int64(math.Ceil(math.Min(float64(n), float64(b.capacity))/float64(b.inc))) *
114+
b.freq.Nanoseconds())
115+
}
88116

89-
b.inc = int64(math.Floor(.5 + (float64(b.capacity) * freq.Seconds())))
117+
func (b *Bucket) fill() {
118+
ticker := time.NewTicker(b.freq)
119+
defer ticker.Stop()
90120

91121
for _ = range ticker.C {
92122
select {

bucket_test.go

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,27 @@
11
package tb
22

33
import (
4+
"fmt"
5+
"runtime"
46
"testing"
57
"time"
68
)
79

810
func TestNewBucket(t *testing.T) {
911
t.Parallel()
1012

11-
b := NewBucket(10, 0)
13+
b := NewBucket(10, -1)
1214
b.Take(10)
1315
time.Sleep(100 * time.Millisecond)
1416
if w, g := int64(0), b.Take(1); w != g {
15-
t.Fatal("Expected no filling when freq < 1/c")
17+
t.Fatal("Expected no filling when freq == -1")
1618
}
1719
}
1820

1921
func TestBucket_Take_single(t *testing.T) {
2022
t.Parallel()
2123

22-
b := NewBucket(10, -1)
24+
b := NewBucket(10, 0)
2325
defer b.Close()
2426

2527
ex := [...]int64{5, 5, 1, 1, 5, 4, 1, 0}
@@ -49,7 +51,7 @@ func TestBucket_Put_single(t *testing.T) {
4951
func TestBucket_Take_multi(t *testing.T) {
5052
t.Parallel()
5153

52-
b := NewBucket(10, -1)
54+
b := NewBucket(10, 0)
5355
defer b.Close()
5456

5557
exs := [2][]int64{{4, 4, 2, 2}, {2, 2, 1, 1}}
@@ -86,11 +88,14 @@ func TestBucket_Put_multi(t *testing.T) {
8688

8789
func TestBucket_Take_throughput(t *testing.T) {
8890
t.Parallel()
91+
8992
if testing.Short() {
9093
t.Skip("Skipping test in short mode")
9194
}
9295

93-
b := NewBucket(1000, -1)
96+
runtime.GOMAXPROCS(2)
97+
98+
b := NewBucket(1000, 0)
9499
defer b.Close()
95100

96101
b.Take(1000)
@@ -110,7 +115,7 @@ func TestBucket_Take_throughput(t *testing.T) {
110115
}
111116

112117
func BenchmarkBucket_Take_sequential(b *testing.B) {
113-
bucket := NewBucket(int64(b.N), -1)
118+
bucket := NewBucket(int64(b.N), 0)
114119
defer bucket.Close()
115120

116121
b.ResetTimer()
@@ -130,8 +135,42 @@ func BenchmarkBucket_Put_sequential(b *testing.B) {
130135
bucket.Put(8)
131136
}
132137
}
138+
139+
func TestBucket_Wait(t *testing.T) {
140+
t.Parallel()
141+
142+
cases := map[*Bucket]time.Duration{
143+
NewBucket(1e3, 500*time.Millisecond): 1 * time.Second,
144+
NewBucket(1e3, 20*time.Millisecond): 1 * time.Second,
145+
NewBucket(1e3, 1*time.Millisecond): 1 * time.Second,
146+
NewBucket(1e3, 0): 1 * time.Second,
147+
NewBucket(2e3, 0): 0,
148+
NewBucket(3e3, 0): 0,
149+
}
150+
errors := make(chan error, len(cases))
151+
152+
for bucket, wait := range cases {
153+
go func(bucket *Bucket, wait time.Duration) {
154+
defer bucket.Close()
155+
156+
if got := bucket.Wait(2000); int(wait.Seconds()) != int(got.Seconds()) {
157+
errors <- fmt.Errorf("bucket.Wait(2000) with cap=%d, freq=%s: Want: %s, Got %s",
158+
bucket.capacity, bucket.freq, wait, got)
159+
} else {
160+
errors <- nil
161+
}
162+
}(bucket, wait)
163+
}
164+
165+
for i := 0; i < cap(errors); i++ {
166+
if err := <-errors; err != nil {
167+
t.Error(err)
168+
}
169+
}
170+
}
171+
133172
func TestBucket_Close(t *testing.T) {
134-
b := NewBucket(10000, -1)
173+
b := NewBucket(10000, 0)
135174
b.Close()
136175
b.Take(10000)
137176
time.Sleep(10 * time.Millisecond)

throttler.go

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ func (t *Throttler) Bucket(key string, rate int64) *Bucket {
4747
t.mu.RUnlock()
4848

4949
if !ok {
50-
b = NewBucket(rate, 0)
50+
b = NewBucket(rate, -1)
5151
b.inc = int64(math.Floor(.5 + (float64(b.capacity) * t.freq.Seconds())))
52+
b.freq = t.freq
5253
t.mu.Lock()
5354
t.buckets[key] = b
5455
t.mu.Unlock()
@@ -57,32 +58,18 @@ func (t *Throttler) Bucket(key string, rate int64) *Bucket {
5758
return b
5859
}
5960

60-
// Wait waits for n amount of tokens to be available, sleeping freq between each
61-
// take. It returns the wait duration and whether it had to wait or not.
61+
// Wait waits for n amount of tokens to be available.
62+
// If n tokens are immediatelly available it doesn't sleep. Otherwise, it sleeps
63+
// the minimum amount of time required for the remaining tokens to be available.
64+
// It returns the wait duration.
6265
//
6366
// If a Bucket (key, rate) doesn't exist yet, it is created.
6467
// If freq < 1/rate seconds, the effective wait rate won't be correct.
6568
//
6669
// You must call Close when you're done with the Throttler in order to not leak
6770
// a go-routine and a system-timer.
68-
func (t *Throttler) Wait(key string, n, rate int64) (time.Duration, bool) {
69-
var (
70-
got int64
71-
began = time.Now()
72-
)
73-
74-
b := t.Bucket(key, rate)
75-
76-
if got = b.Take(n); got == n {
77-
return time.Since(began), false
78-
}
79-
80-
for got < n {
81-
got += b.Take(n - got)
82-
time.Sleep(t.freq)
83-
}
84-
85-
return time.Since(began), true
71+
func (t *Throttler) Wait(key string, n, rate int64) time.Duration {
72+
return t.Bucket(key, rate).Wait(n)
8673
}
8774

8875
// Halt returns a bool indicating if the Bucket identified by key and rate has

throttler_test.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,12 @@ func TestThrottler_Wait(t *testing.T) {
5656
th := NewThrottler(1 * time.Millisecond)
5757
defer th.Close()
5858

59-
if _, waited := th.Wait("a", 1000, 1000); waited {
59+
if wait := th.Wait("a", 1000, 1000); wait > 0 {
6060
t.Fatal("Didn't expect wait")
6161
}
6262

63-
if took, waited := th.Wait("a", 1000, 1000); !waited || int(took.Seconds()) != 1 {
64-
t.Fatalf("Expected wait of 1s. Got: %s", took)
65-
}
66-
67-
if _, waited := th.Wait("b", 1000, 1000); waited {
68-
t.Fatal("Didn't expect wait")
63+
if wait := th.Wait("a", 2000, 1000); int(wait.Seconds()) != 2 {
64+
t.Fatalf("Expected wait of 2s. Got: %s", wait)
6965
}
7066
}
7167

0 commit comments

Comments
 (0)