Skip to content

Commit

Permalink
Redis Sentinel support.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Jun 14, 2014
1 parent d92dc7c commit a042cdd
Show file tree
Hide file tree
Showing 5 changed files with 407 additions and 60 deletions.
4 changes: 2 additions & 2 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func ExamplePubSub() {
msg, err = pubsub.Receive()
fmt.Println(msg, err)

// Output: &{subscribe mychannel 1} <nil>
// &{mychannel hello} <nil>
// Output: subscribe: mychannel <nil>
// Message<mychannel: hello> <nil>
}

func ExampleScript() {
Expand Down
104 changes: 66 additions & 38 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ type pool interface {
Len() int
Size() int
Close() error
Filter(func(*conn) bool)
}

//------------------------------------------------------------------------------

type conn struct {
cn net.Conn
netcn net.Conn
rd reader
inUse bool

inUse bool
usedAt time.Time

readTimeout time.Duration
Expand All @@ -50,9 +51,8 @@ func newConnFunc(dial func() (net.Conn, error)) func() (*conn, error) {
if err != nil {
return nil, err
}

cn := &conn{
cn: netcn,
netcn: netcn,
}
cn.rd = bufio.NewReader(cn)
return cn, nil
Expand All @@ -61,24 +61,28 @@ func newConnFunc(dial func() (net.Conn, error)) func() (*conn, error) {

func (cn *conn) Read(b []byte) (int, error) {
if cn.readTimeout != 0 {
cn.cn.SetReadDeadline(time.Now().Add(cn.readTimeout))
cn.netcn.SetReadDeadline(time.Now().Add(cn.readTimeout))
} else {
cn.cn.SetReadDeadline(zeroTime)
cn.netcn.SetReadDeadline(zeroTime)
}
return cn.cn.Read(b)
return cn.netcn.Read(b)
}

func (cn *conn) Write(b []byte) (int, error) {
if cn.writeTimeout != 0 {
cn.cn.SetWriteDeadline(time.Now().Add(cn.writeTimeout))
cn.netcn.SetWriteDeadline(time.Now().Add(cn.writeTimeout))
} else {
cn.cn.SetWriteDeadline(zeroTime)
cn.netcn.SetWriteDeadline(zeroTime)
}
return cn.cn.Write(b)
return cn.netcn.Write(b)
}

func (cn *conn) RemoteAddr() net.Addr {
return cn.netcn.RemoteAddr()
}

func (cn *conn) Close() error {
return cn.cn.Close()
return cn.netcn.Close()
}

//------------------------------------------------------------------------------
Expand All @@ -87,30 +91,24 @@ type connPool struct {
dial func() (*conn, error)
rl *rateLimiter

opt *options

cond *sync.Cond
conns *list.List

idleNum int
maxSize int
idleTimeout time.Duration

closed bool
idleNum int
closed bool
}

func newConnPool(
dial func() (*conn, error),
maxSize int,
idleTimeout time.Duration,
) *connPool {
func newConnPool(dial func() (*conn, error), opt *options) *connPool {
return &connPool{
dial: dial,
rl: newRateLimiter(time.Second, 2*maxSize),
rl: newRateLimiter(time.Second, 2*opt.PoolSize),

opt: opt,

cond: sync.NewCond(&sync.Mutex{}),
conns: list.New(),

maxSize: maxSize,
idleTimeout: idleTimeout,
}
}

Expand All @@ -131,21 +129,21 @@ func (p *connPool) Get() (*conn, bool, error) {
return nil, false, errClosed
}

if p.idleTimeout > 0 {
if p.opt.IdleTimeout > 0 {
for el := p.conns.Front(); el != nil; el = el.Next() {
cn := el.Value.(*conn)
if cn.inUse {
break
}
if time.Since(cn.usedAt) > p.idleTimeout {
if time.Since(cn.usedAt) > p.opt.IdleTimeout {
if err := p.remove(cn); err != nil {
glog.Errorf("remove failed: %s", err)
}
}
}
}

for p.conns.Len() >= p.maxSize && p.idleNum == 0 {
for p.conns.Len() >= p.opt.PoolSize && p.idleNum == 0 {
p.cond.Wait()
}

Expand All @@ -163,8 +161,8 @@ func (p *connPool) Get() (*conn, bool, error) {
return cn, false, nil
}

if p.conns.Len() < p.maxSize {
cn, err := p.new()
if p.conns.Len() < p.opt.PoolSize {
cn, err := p.dial()
if err != nil {
p.cond.L.Unlock()
return nil, false, err
Expand All @@ -187,7 +185,7 @@ func (p *connPool) Put(cn *conn) error {
return p.Remove(cn)
}

if p.idleTimeout > 0 {
if p.opt.IdleTimeout > 0 {
cn.usedAt = time.Now()
}

Expand Down Expand Up @@ -241,6 +239,18 @@ func (p *connPool) Size() int {
return p.conns.Len()
}

func (p *connPool) Filter(f func(*conn) bool) {
p.cond.L.Lock()
for el, next := p.conns.Front(), p.conns.Front(); el != nil; el = next {
next = el.Next()
cn := el.Value.(*conn)
if !f(cn) {
p.remove(cn)
}
}
p.cond.L.Unlock()
}

func (p *connPool) Close() error {
defer p.cond.L.Unlock()
p.cond.L.Lock()
Expand All @@ -249,7 +259,11 @@ func (p *connPool) Close() error {
}
p.closed = true
var retErr error
for e := p.conns.Front(); e != nil; e = e.Next() {
for {
e := p.conns.Front()
if e == nil {
break
}
if err := p.remove(e.Value.(*conn)); err != nil {
glog.Errorf("cn.Close failed: %s", err)
retErr = err
Expand Down Expand Up @@ -315,17 +329,24 @@ func (p *singleConnPool) Put(cn *conn) error {
}

func (p *singleConnPool) Remove(cn *conn) error {
defer p.l.Unlock()
p.l.Lock()
if p.cn == nil {
panic("p.cn == nil")
}
if p.cn != cn {
panic("p.cn != cn")
}
if p.closed {
p.l.Unlock()
return errClosed
}
return p.remove()
}

func (p *singleConnPool) remove() error {
err := p.pool.Remove(p.cn)
p.cn = nil
p.l.Unlock()
return nil
return err
}

func (p *singleConnPool) Len() int {
Expand All @@ -346,15 +367,23 @@ func (p *singleConnPool) Size() int {
return 1
}

func (p *singleConnPool) Filter(f func(*conn) bool) {
p.l.Lock()
if p.cn != nil {
if !f(p.cn) {
p.remove()
}
}
p.l.Unlock()
}

func (p *singleConnPool) Close() error {
defer p.l.Unlock()
p.l.Lock()

if p.closed {
return nil
}
p.closed = true

var err error
if p.cn != nil {
if p.reusable {
Expand All @@ -364,6 +393,5 @@ func (p *singleConnPool) Close() error {
}
}
p.cn = nil

return err
}
12 changes: 12 additions & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,30 @@ type Message struct {
Payload string
}

func (m *Message) String() string {
return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
}

type PMessage struct {
Channel string
Pattern string
Payload string
}

func (m *PMessage) String() string {
return fmt.Sprintf("PMessage<%s: %s>", m.Channel, m.Payload)
}

type Subscription struct {
Kind string
Channel string
Count int
}

func (m *Subscription) String() string {
return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
}

func (c *PubSub) Receive() (interface{}, error) {
return c.ReceiveTimeout(0)
}
Expand Down
59 changes: 39 additions & 20 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import (

type baseClient struct {
connPool pool

opt *Options

cmds []Cmder
opt *options
cmds []Cmder
}

func (c *baseClient) writeCmd(cn *conn, cmds ...Cmder) error {
Expand Down Expand Up @@ -133,17 +131,29 @@ func (c *baseClient) Close() error {

//------------------------------------------------------------------------------

type options struct {
Password string
DB int64

DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration

PoolSize int
IdleTimeout time.Duration
}

type Options struct {
Addr string
Password string
DB int64

PoolSize int

DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration

PoolSize int
IdleTimeout time.Duration
}

func (opt *Options) getPoolSize() int {
Expand All @@ -160,32 +170,41 @@ func (opt *Options) getDialTimeout() time.Duration {
return opt.DialTimeout
}

//------------------------------------------------------------------------------
func (opt *Options) options() *options {
return &options{
DB: opt.DB,
Password: opt.Password,

DialTimeout: opt.getDialTimeout(),
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,

PoolSize: opt.getPoolSize(),
IdleTimeout: opt.IdleTimeout,
}
}

type Client struct {
*baseClient
}

func newClient(opt *Options, dial func() (net.Conn, error)) *Client {
func newClient(clOpt *Options, network string) *Client {
opt := clOpt.options()
dialer := func() (net.Conn, error) {
return net.DialTimeout(network, clOpt.Addr, opt.DialTimeout)
}
return &Client{
baseClient: &baseClient{
opt: opt,

connPool: newConnPool(newConnFunc(dial), opt.getPoolSize(), opt.IdleTimeout),
opt: opt,
connPool: newConnPool(newConnFunc(dialer), opt),
},
}
}

func NewTCPClient(opt *Options) *Client {
dial := func() (net.Conn, error) {
return net.DialTimeout("tcp", opt.Addr, opt.getDialTimeout())
}
return newClient(opt, dial)
return newClient(opt, "tcp")
}

func NewUnixClient(opt *Options) *Client {
dial := func() (net.Conn, error) {
return net.DialTimeout("unix", opt.Addr, opt.getDialTimeout())
}
return newClient(opt, dial)
return newClient(opt, "unix")
}
Loading

0 comments on commit a042cdd

Please sign in to comment.