diff --git a/cluster.go b/cluster.go index bbb505b0e..6fbc774aa 100644 --- a/cluster.go +++ b/cluster.go @@ -1,25 +1,36 @@ package redis import ( - "errors" - "io" "math/rand" - "net" "strings" "sync" "sync/atomic" "time" ) +func removeDuplicates(slice []string) []string { + seen := make(map[string]struct{}, len(slice)) + for i := 0; i < len(slice); { + addr := slice[i] + if _, ok := seen[addr]; ok { + slice = append(slice[:i], slice[i+1:]...) + } else { + seen[addr] = struct{}{} + i++ + } + } + return slice +} + type ClusterClient struct { commandable - addrs map[string]struct{} + addrs []string slots [][]string slotsMx sync.RWMutex // protects slots & addrs cache - conns map[string]*Client - connsMx sync.Mutex // protects conns + clients map[string]*Client + clientsMx sync.RWMutex // protects clients opt *ClusterOptions @@ -28,95 +39,122 @@ type ClusterClient struct { // NewClusterClient initializes a new cluster-aware client using given options. // A list of seed addresses must be provided. -func NewClusterClient(opt *ClusterOptions) (*ClusterClient, error) { - addrs, err := opt.getAddrSet() - if err != nil { - return nil, err - } - +func NewClusterClient(opt *ClusterOptions) *ClusterClient { client := &ClusterClient{ - addrs: addrs, - conns: make(map[string]*Client), + addrs: opt.getAddrs(), + clients: make(map[string]*Client), opt: opt, _reload: 1, } client.commandable.process = client.process - client.reloadIfDue() go client.reaper(time.NewTicker(5 * time.Minute)) - return client, nil + return client } -// Close closes the cluster connection +// Close closes the cluster client. func (c *ClusterClient) Close() error { - c.slotsMx.Lock() - defer c.slotsMx.Unlock() - - return c.reset() + // TODO: close should make client unusable + c.setSlots(nil) + return nil } // ------------------------------------------------------------------------ -// Finds the current master address for a given hash slot -func (c *ClusterClient) getMasterAddrBySlot(hashSlot int) string { - if addrs := c.slots[hashSlot]; len(addrs) > 0 { - return addrs[0] +// getClient returns a Client for a given address. +func (c *ClusterClient) getClient(addr string) *Client { + c.clientsMx.RLock() + client, ok := c.clients[addr] + if ok { + c.clientsMx.RUnlock() + return client } - return "" -} + c.clientsMx.RUnlock() -// Returns a node's client for a given address -func (c *ClusterClient) getNodeClientByAddr(addr string) *Client { - c.connsMx.Lock() - client, ok := c.conns[addr] + c.clientsMx.Lock() + client, ok = c.clients[addr] if !ok { opt := c.opt.clientOptions() opt.Addr = addr client = NewTCPClient(opt) - c.conns[addr] = client + c.clients[addr] = client } - c.connsMx.Unlock() + c.clientsMx.Unlock() + return client } +// randomClient returns a Client for the first live node. +func (c *ClusterClient) randomClient() (client *Client, err error) { + for i := 0; i < 10; i++ { + n := rand.Intn(len(c.addrs)) + client = c.getClient(c.addrs[n]) + err = client.Ping().Err() + if err == nil { + return client, nil + } + } + return nil, err +} + // Process a command func (c *ClusterClient) process(cmd Cmder) { + var client *Client var ask bool c.reloadIfDue() - hashSlot := hashSlot(cmd.clusterKey()) + slot := hashSlot(cmd.clusterKey()) c.slotsMx.RLock() defer c.slotsMx.RUnlock() - tried := make(map[string]struct{}, len(c.addrs)) - addr := c.getMasterAddrBySlot(hashSlot) - for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ { - tried[addr] = struct{}{} + addrs := c.slots[slot] + if len(addrs) > 0 { + // First address is master. + client = c.getClient(addrs[0]) + } else { + var err error + client, err = c.randomClient() + if err != nil { + cmd.setErr(err) + return + } + } + + // Index in the addrs slice pointing to the next replica. + replicaIndex := 1 - // Pick the connection, process request - conn := c.getNodeClientByAddr(addr) + for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ { if ask { - pipe := conn.Pipeline() + pipe := client.Pipeline() pipe.Process(NewCmd("ASKING")) pipe.Process(cmd) _, _ = pipe.Exec() ask = false } else { - conn.Process(cmd) + client.Process(cmd) } // If there is no (real) error, we are done! err := cmd.Err() - if err == nil || err == Nil { + if err == nil || err == Nil || err == TxFailedErr { return } - // On connection errors, pick a random, previosuly untried connection - // and request again. - if _, ok := err.(*net.OpError); ok || err == io.EOF { - if addr = c.findNextAddr(tried); addr == "" { - return + // On network errors try another node. + if isNetworkError(err) { + if replicaIndex < len(addrs) { + // Try next available replica. + client = c.getClient(addrs[replicaIndex]) + replicaIndex++ + cmd.reset() + continue + } else { + // Otherwise try random node. + client, err = c.randomClient() + if err != nil { + return + } } cmd.reset() continue @@ -131,11 +169,11 @@ func (c *ClusterClient) process(cmd Cmder) { // Handle MOVE and ASK redirections, return on any other error switch parts[0] { case "MOVED": - c.forceReload() - addr = parts[2] + c.scheduleReload() + client = c.getClient(parts[2]) case "ASK": ask = true - addr = parts[2] + client = c.getClient(parts[2]) default: return } @@ -143,82 +181,58 @@ func (c *ClusterClient) process(cmd Cmder) { } } -// Closes all connections and reloads slot cache, if due -func (c *ClusterClient) reloadIfDue() (err error) { - if !atomic.CompareAndSwapUint32(&c._reload, 1, 0) { - return +// Closes all clients and returns last error if there are any. +func (c *ClusterClient) resetClients() (err error) { + c.clientsMx.Lock() + for addr, client := range c.clients { + if e := client.Close(); e != nil { + err = e + } + delete(c.clients, addr) } + c.clientsMx.Unlock() + return err +} - var infos []ClusterSlotInfo - +func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) { c.slotsMx.Lock() - defer c.slotsMx.Unlock() - - // Try known addresses in random order (map interation order is random in Go) - // http://redis.io/topics/cluster-spec#clients-first-connection-and-handling-of-redirections - // https://github.com/antirez/redis-rb-cluster/blob/fd931ed/cluster.rb#L157 - for addr := range c.addrs { - c.reset() - infos, err = c.fetchClusterSlots(addr) - if err == nil { - c.update(infos) - break + c.slots = make([][]string, hashSlots) + for _, info := range slots { + for i := info.Start; i <= info.End; i++ { + c.slots[i] = info.Addrs } + c.addrs = append(c.addrs, info.Addrs...) } - return -} + c.addrs = removeDuplicates(c.addrs) + c.resetClients() -// Closes all connections and flushes slots cache -func (c *ClusterClient) reset() (err error) { - c.connsMx.Lock() - for addr, client := range c.conns { - if e := client.Close(); e != nil { - err = e - } - delete(c.conns, addr) - } - c.connsMx.Unlock() - c.slots = make([][]string, hashSlots) - return + c.slotsMx.Unlock() } -// Forces a cache reload on next request -func (c *ClusterClient) forceReload() { - atomic.StoreUint32(&c._reload, 1) -} +// Closes all connections and reloads slot cache, if due. +func (c *ClusterClient) reloadIfDue() (err error) { + if !atomic.CompareAndSwapUint32(&c._reload, 1, 0) { + return + } -// Find the next untried address -func (c *ClusterClient) findNextAddr(tried map[string]struct{}) string { - for addr := range c.addrs { - if _, ok := tried[addr]; !ok { - return addr - } + client, err := c.randomClient() + if err != nil { + return err } - return "" -} -// Fetch slot information -func (c *ClusterClient) fetchClusterSlots(addr string) ([]ClusterSlotInfo, error) { - opt := c.opt.clientOptions() - opt.Addr = addr - client := NewClient(opt) - defer client.Close() + slots, err := client.ClusterSlots().Result() + if err != nil { + return err + } + c.setSlots(slots) - return client.ClusterSlots().Result() + return nil } -// Update slot information, populate slots -func (c *ClusterClient) update(infos []ClusterSlotInfo) { - for _, info := range infos { - for i := info.Start; i <= info.End; i++ { - c.slots[i] = info.Addrs - } - - for _, addr := range info.Addrs { - c.addrs[addr] = struct{}{} - } - } +// Schedules slots reload on next request. +func (c *ClusterClient) scheduleReload() { + atomic.StoreUint32(&c._reload, 1) } // reaper closes idle connections to the cluster. @@ -237,8 +251,6 @@ func (c *ClusterClient) reaper(ticker *time.Ticker) { //------------------------------------------------------------------------------ -var errNoAddrs = errors.New("redis: no addresses") - type ClusterOptions struct { // A seed-list of host:port addresses of known cluster nodes Addrs []string @@ -278,17 +290,9 @@ func (opt *ClusterOptions) getMaxRedirects() int { return opt.MaxRedirects } -func (opt *ClusterOptions) getAddrSet() (map[string]struct{}, error) { - size := len(opt.Addrs) - if size < 1 { - return nil, errNoAddrs - } - - addrs := make(map[string]struct{}, size) - for _, addr := range opt.Addrs { - addrs[addr] = struct{}{} - } - return addrs, nil +func (opt *ClusterOptions) getAddrs() []string { + opt.Addrs = removeDuplicates(opt.Addrs) + return opt.Addrs } func (opt *ClusterOptions) clientOptions() *Options { diff --git a/cluster_client_test.go b/cluster_client_test.go deleted file mode 100644 index dbeb1b2fa..000000000 --- a/cluster_client_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package redis - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -var _ = Describe("ClusterClient", func() { - - var subject *ClusterClient - var populate = func() { - subject.reset() - subject.update([]ClusterSlotInfo{ - {0, 4095, []string{"127.0.0.1:7000", "127.0.0.1:7004"}}, - {12288, 16383, []string{"127.0.0.1:7003", "127.0.0.1:7007"}}, - {4096, 8191, []string{"127.0.0.1:7001", "127.0.0.1:7005"}}, - {8192, 12287, []string{"127.0.0.1:7002", "127.0.0.1:7006"}}, - }) - } - - BeforeEach(func() { - var err error - subject, err = NewClusterClient(&ClusterOptions{ - Addrs: []string{"127.0.0.1:6379", "127.0.0.1:7003", "127.0.0.1:7006"}, - }) - Expect(err).NotTo(HaveOccurred()) - }) - - AfterEach(func() { - subject.Close() - }) - - It("should initialize", func() { - Expect(subject.addrs).To(HaveLen(3)) - Expect(subject.slots).To(HaveLen(hashSlots)) - Expect(subject._reload).To(Equal(uint32(0))) - }) - - It("should update slots cache", func() { - populate() - Expect(subject.slots[0]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"})) - Expect(subject.slots[4095]).To(Equal([]string{"127.0.0.1:7000", "127.0.0.1:7004"})) - Expect(subject.slots[4096]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"})) - Expect(subject.slots[8191]).To(Equal([]string{"127.0.0.1:7001", "127.0.0.1:7005"})) - Expect(subject.slots[8192]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"})) - Expect(subject.slots[12287]).To(Equal([]string{"127.0.0.1:7002", "127.0.0.1:7006"})) - Expect(subject.slots[12288]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"})) - Expect(subject.slots[16383]).To(Equal([]string{"127.0.0.1:7003", "127.0.0.1:7007"})) - Expect(subject.addrs).To(Equal(map[string]struct{}{ - "127.0.0.1:6379": struct{}{}, - "127.0.0.1:7000": struct{}{}, - "127.0.0.1:7001": struct{}{}, - "127.0.0.1:7002": struct{}{}, - "127.0.0.1:7003": struct{}{}, - "127.0.0.1:7004": struct{}{}, - "127.0.0.1:7005": struct{}{}, - "127.0.0.1:7006": struct{}{}, - "127.0.0.1:7007": struct{}{}, - })) - }) - - It("should find next addresses", func() { - populate() - seen := map[string]struct{}{ - "127.0.0.1:7000": struct{}{}, - "127.0.0.1:7001": struct{}{}, - "127.0.0.1:7003": struct{}{}, - } - - addr := subject.findNextAddr(seen) - for addr != "" { - seen[addr] = struct{}{} - addr = subject.findNextAddr(seen) - } - Expect(subject.findNextAddr(seen)).To(Equal("")) - Expect(seen).To(Equal(map[string]struct{}{ - "127.0.0.1:6379": struct{}{}, - "127.0.0.1:7000": struct{}{}, - "127.0.0.1:7001": struct{}{}, - "127.0.0.1:7002": struct{}{}, - "127.0.0.1:7003": struct{}{}, - "127.0.0.1:7004": struct{}{}, - "127.0.0.1:7005": struct{}{}, - "127.0.0.1:7006": struct{}{}, - "127.0.0.1:7007": struct{}{}, - })) - }) - - It("should check if reload is due", func() { - subject._reload = 0 - Expect(subject._reload).To(Equal(uint32(0))) - subject.forceReload() - Expect(subject._reload).To(Equal(uint32(1))) - }) -}) diff --git a/cluster_test.go b/cluster_test.go index ce583add7..5d12cd34a 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -148,11 +148,9 @@ var _ = Describe("Cluster", func() { var client *redis.ClusterClient BeforeEach(func() { - var err error - client, err = redis.NewClusterClient(&redis.ClusterOptions{ + client = redis.NewClusterClient(&redis.ClusterOptions{ Addrs: []string{"127.0.0.1:8220", "127.0.0.1:8221", "127.0.0.1:8222", "127.0.0.1:8223", "127.0.0.1:8224", "127.0.0.1:8225"}, }) - Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { diff --git a/error.go b/error.go index 667fffdc6..33159d5c7 100644 --- a/error.go +++ b/error.go @@ -2,6 +2,8 @@ package redis import ( "fmt" + "io" + "net" ) // Redis nil reply. @@ -21,3 +23,10 @@ func errorf(s string, args ...interface{}) redisError { func (err redisError) Error() string { return err.s } + +func isNetworkError(err error) bool { + if _, ok := err.(*net.OpError); ok || err == io.EOF { + return true + } + return false +}