Skip to content

Commit 0303cfb

Browse files
committed
Merge pull request #284 from go-redis/fix/pool-tests
Move some tests to pool package.
2 parents 998148b + 93a7fe0 commit 0303cfb

File tree

5 files changed

+173
-80
lines changed

5 files changed

+173
-80
lines changed

internal/pool/conn_stack.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ func (s *connStack) ShiftStale(idleTimeout time.Duration) *Conn {
3232
select {
3333
case <-s.free:
3434
s.mu.Lock()
35-
defer s.mu.Unlock()
36-
3735
if cn := s.cns[0]; cn.IsStale(idleTimeout) {
3836
copy(s.cns, s.cns[1:])
3937
s.cns = s.cns[:len(s.cns)-1]
38+
s.mu.Unlock()
4039
return cn
4140
}
41+
s.mu.Unlock()
4242

4343
s.free <- struct{}{}
4444
return nil

internal/pool/main_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package pool_test
2+
3+
import (
4+
"net"
5+
"sync"
6+
"testing"
7+
8+
. "github.com/onsi/ginkgo"
9+
. "github.com/onsi/gomega"
10+
)
11+
12+
func TestGinkgoSuite(t *testing.T) {
13+
RegisterFailHandler(Fail)
14+
RunSpecs(t, "pool")
15+
}
16+
17+
func perform(n int, cbs ...func(int)) {
18+
var wg sync.WaitGroup
19+
for _, cb := range cbs {
20+
for i := 0; i < n; i++ {
21+
wg.Add(1)
22+
go func(cb func(int), i int) {
23+
defer GinkgoRecover()
24+
defer wg.Done()
25+
26+
cb(i)
27+
}(cb, i)
28+
}
29+
}
30+
wg.Wait()
31+
}
32+
33+
func dummyDialer() (net.Conn, error) {
34+
return &net.TCPConn{}, nil
35+
}

internal/pool/pool.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"gopkg.in/bsm/ratelimit.v1"
1313
)
1414

15-
var Logger = log.New(os.Stderr, "pg: ", log.LstdFlags)
15+
var Logger = log.New(os.Stderr, "redis: ", log.LstdFlags)
1616

1717
var (
1818
ErrClosed = errors.New("redis: client is closed")
@@ -108,9 +108,9 @@ func (p *ConnPool) First() *Conn {
108108
}
109109

110110
// wait waits for free non-idle connection. It returns nil on timeout.
111-
func (p *ConnPool) wait() *Conn {
111+
func (p *ConnPool) wait(timeout time.Duration) *Conn {
112112
for {
113-
cn := p.freeConns.PopWithTimeout(p.poolTimeout)
113+
cn := p.freeConns.PopWithTimeout(timeout)
114114
if cn != nil && cn.IsStale(p.idleTimeout) {
115115
var err error
116116
cn, err = p.replace(cn)
@@ -175,7 +175,7 @@ func (p *ConnPool) Get() (*Conn, error) {
175175

176176
// Otherwise, wait for the available connection.
177177
atomic.AddUint32(&p.stats.Waits, 1)
178-
if cn := p.wait(); cn != nil {
178+
if cn := p.wait(p.poolTimeout); cn != nil {
179179
return cn, nil
180180
}
181181

@@ -270,8 +270,8 @@ func (p *ConnPool) Close() (retErr error) {
270270
}
271271

272272
// Wait for app to free connections, but don't close them immediately.
273-
for i := 0; i < p.Len(); i++ {
274-
if cn := p.wait(); cn == nil {
273+
for i := 0; i < p.Len()-p.FreeLen(); i++ {
274+
if cn := p.wait(3 * time.Second); cn == nil {
275275
break
276276
}
277277
}

internal/pool/pool_test.go

Lines changed: 130 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,93 @@ import (
1212
"gopkg.in/redis.v3/internal/pool"
1313
)
1414

15-
func TestGinkgoSuite(t *testing.T) {
16-
RegisterFailHandler(Fail)
17-
RunSpecs(t, "pool")
18-
}
15+
var _ = Describe("ConnPool", func() {
16+
var connPool *pool.ConnPool
17+
18+
BeforeEach(func() {
19+
pool.SetIdleCheckFrequency(time.Second)
20+
connPool = pool.NewConnPool(dummyDialer, 10, time.Hour, time.Second)
21+
})
22+
23+
AfterEach(func() {
24+
connPool.Close()
25+
})
26+
27+
It("rate limits dial", func() {
28+
var rateErr error
29+
for i := 0; i < 1000; i++ {
30+
cn, err := connPool.Get()
31+
if err != nil {
32+
rateErr = err
33+
break
34+
}
35+
36+
_ = connPool.Replace(cn, errors.New("test"))
37+
}
38+
39+
Expect(rateErr).To(MatchError(`redis: you open connections too fast (last_error="test")`))
40+
})
41+
42+
It("should unblock client when conn is removed", func() {
43+
// Reserve one connection.
44+
cn, err := connPool.Get()
45+
Expect(err).NotTo(HaveOccurred())
46+
47+
// Reserve all other connections.
48+
var cns []*pool.Conn
49+
for i := 0; i < 9; i++ {
50+
cn, err := connPool.Get()
51+
Expect(err).NotTo(HaveOccurred())
52+
cns = append(cns, cn)
53+
}
54+
55+
started := make(chan bool, 1)
56+
done := make(chan bool, 1)
57+
go func() {
58+
defer GinkgoRecover()
59+
60+
started <- true
61+
_, err := connPool.Get()
62+
Expect(err).NotTo(HaveOccurred())
63+
done <- true
64+
65+
err = connPool.Put(cn)
66+
Expect(err).NotTo(HaveOccurred())
67+
}()
68+
<-started
69+
70+
// Check that Get is blocked.
71+
select {
72+
case <-done:
73+
Fail("Get is not blocked")
74+
default:
75+
// ok
76+
}
77+
78+
err = connPool.Replace(cn, errors.New("test"))
79+
Expect(err).NotTo(HaveOccurred())
80+
81+
// Check that Ping is unblocked.
82+
select {
83+
case <-done:
84+
// ok
85+
case <-time.After(time.Second):
86+
Fail("Get is not unblocked")
87+
}
88+
89+
for _, cn := range cns {
90+
err = connPool.Put(cn)
91+
Expect(err).NotTo(HaveOccurred())
92+
}
93+
})
94+
})
1995

2096
var _ = Describe("conns reapser", func() {
2197
var connPool *pool.ConnPool
2298

2399
BeforeEach(func() {
24-
dial := func() (net.Conn, error) {
25-
return &net.TCPConn{}, nil
26-
}
27-
connPool = pool.NewConnPool(dial, 10, 0, time.Minute)
100+
pool.SetIdleCheckFrequency(time.Hour)
101+
connPool = pool.NewConnPool(dummyDialer, 10, 0, time.Minute)
28102

29103
// add stale connections
30104
for i := 0; i < 3; i++ {
@@ -49,6 +123,10 @@ var _ = Describe("conns reapser", func() {
49123
Expect(n).To(Equal(3))
50124
})
51125

126+
AfterEach(func() {
127+
connPool.Close()
128+
})
129+
52130
It("reaps stale connections", func() {
53131
Expect(connPool.Len()).To(Equal(3))
54132
Expect(connPool.FreeLen()).To(Equal(3))
@@ -92,3 +170,47 @@ var _ = Describe("conns reapser", func() {
92170
}
93171
})
94172
})
173+
174+
var _ = Describe("race", func() {
175+
var connPool *pool.ConnPool
176+
177+
var C, N = 10, 1000
178+
if testing.Short() {
179+
C = 4
180+
N = 100
181+
}
182+
183+
BeforeEach(func() {
184+
pool.SetIdleCheckFrequency(time.Second)
185+
connPool = pool.NewConnPool(dummyDialer, 10, time.Second, time.Second)
186+
})
187+
188+
AfterEach(func() {
189+
connPool.Close()
190+
})
191+
192+
It("does not happend", func() {
193+
perform(C, func(id int) {
194+
for i := 0; i < N; i++ {
195+
cn, err := connPool.Get()
196+
if err == nil {
197+
connPool.Put(cn)
198+
}
199+
}
200+
}, func(id int) {
201+
for i := 0; i < N; i++ {
202+
cn, err := connPool.Get()
203+
if err == nil {
204+
connPool.Replace(cn, errors.New("test"))
205+
}
206+
}
207+
}, func(id int) {
208+
for i := 0; i < N; i++ {
209+
cn, err := connPool.Get()
210+
if err == nil {
211+
connPool.Remove(cn, errors.New("test"))
212+
}
213+
}
214+
})
215+
})
216+
})

pool_test.go

Lines changed: 0 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package redis_test
22

33
import (
4-
"errors"
5-
"time"
6-
74
. "github.com/onsi/ginkgo"
85
. "github.com/onsi/gomega"
96

@@ -131,65 +128,4 @@ var _ = Describe("pool", func() {
131128
Expect(stats.Waits).To(Equal(uint32(0)))
132129
Expect(stats.Timeouts).To(Equal(uint32(0)))
133130
})
134-
135-
It("should unblock client when connection is removed", func() {
136-
pool := client.Pool()
137-
138-
// Reserve one connection.
139-
cn, err := pool.Get()
140-
Expect(err).NotTo(HaveOccurred())
141-
142-
// Reserve the rest of connections.
143-
for i := 0; i < 9; i++ {
144-
_, err := pool.Get()
145-
Expect(err).NotTo(HaveOccurred())
146-
}
147-
148-
var ping *redis.StatusCmd
149-
started := make(chan bool, 1)
150-
done := make(chan bool, 1)
151-
go func() {
152-
started <- true
153-
ping = client.Ping()
154-
done <- true
155-
}()
156-
<-started
157-
158-
// Check that Ping is blocked.
159-
select {
160-
case <-done:
161-
panic("Ping is not blocked")
162-
default:
163-
// ok
164-
}
165-
166-
err = pool.Replace(cn, errors.New("test"))
167-
Expect(err).NotTo(HaveOccurred())
168-
169-
// Check that Ping is unblocked.
170-
select {
171-
case <-done:
172-
// ok
173-
case <-time.After(time.Second):
174-
panic("Ping is not unblocked")
175-
}
176-
Expect(ping.Err()).NotTo(HaveOccurred())
177-
})
178-
179-
It("should rate limit dial", func() {
180-
pool := client.Pool()
181-
182-
var rateErr error
183-
for i := 0; i < 1000; i++ {
184-
cn, err := pool.Get()
185-
if err != nil {
186-
rateErr = err
187-
break
188-
}
189-
190-
_ = pool.Replace(cn, errors.New("test"))
191-
}
192-
193-
Expect(rateErr).To(MatchError(`redis: you open connections too fast (last_error="test")`))
194-
})
195131
})

0 commit comments

Comments
 (0)