Skip to content

Commit

Permalink
Pass by HostInfo instead of the peer address
Browse files Browse the repository at this point in the history
Pass the full HostInfo to the connection pool after getting new
hosts, with the caveat that seed hosts wont have valid host info.
  • Loading branch information
Zariel authored and Chris Bannister committed Aug 6, 2014
1 parent 3322078 commit 358e250
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 57 deletions.
1 change: 0 additions & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
session: s,
dcFilter: cfg.Discovery.DcFilter,
rackFilter: cfg.Discovery.RackFilter,
previous: cfg.Hosts,
}

go hostSource.run(cfg.Discovery.Sleep)
Expand Down
62 changes: 45 additions & 17 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ type ConnectionPool interface {
Size() int
HandleError(*Conn, error, bool)
Close()
AddHost(addr string)
RemoveHost(addr string)
SetHosts(host []HostInfo)
}

//NewPoolFunc is the type used by ClusterConfig to create a pool of a specific type.
Expand All @@ -107,9 +106,10 @@ type SimplePool struct {
connPool map[string]*RoundRobin
conns map[*Conn]struct{}
keyspace string
// current hosts

hostMu sync.RWMutex
hosts map[string]struct{}
// this is the set of current hosts which the pool will attempt to connect to
hosts map[string]*HostInfo

// protects hostpool, connPoll, conns, quit
mu sync.Mutex
Expand All @@ -132,11 +132,13 @@ func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
quitWait: make(chan bool),
cFillingPool: make(chan int, 1),
keyspace: cfg.Keyspace,
hosts: make(map[string]struct{}),
hosts: make(map[string]*HostInfo),
}

for _, host := range cfg.Hosts {
pool.hosts[host] = struct{}{}
// seed hosts have unknown topology
// TODO: Handle populating this during SetHosts
pool.hosts[host] = &HostInfo{Peer: host}
}

//Walk through connecting to hosts. As soon as one host connects
Expand All @@ -157,7 +159,7 @@ func NewSimplePool(cfg *ClusterConfig) ConnectionPool {
return pool
}

func (c *SimplePool) connect(addr string) error {
func (c *SimplePool) newConn(addr string) (*Conn, error) {
cfg := ConnConfig{
ProtoVersion: c.cfg.ProtoVersion,
CQLVersion: c.cfg.CQLVersion,
Expand All @@ -170,10 +172,20 @@ func (c *SimplePool) connect(addr string) error {

conn, err := Connect(addr, cfg, c)
if err != nil {
log.Printf("failed to connect to %q: %v", addr, err)
log.Printf("newConn: failed to connect to %q: %v", addr, err)
return nil, err
}

return conn, nil
}

func (c *SimplePool) connect(addr string) error {
if conn, err := c.newConn(addr); err != nil {
log.Printf("connect: failed to connect to %q: %v", addr, err)
return err
} else {
return c.addConn(conn)
}
return c.addConn(conn)
}

func (c *SimplePool) addConn(conn *Conn) error {
Expand Down Expand Up @@ -337,23 +349,39 @@ func (c *SimplePool) Close() {
})
}

func (c *SimplePool) AddHost(addr string) {
func (c *SimplePool) SetHosts(hosts []HostInfo) {
c.hostMu.Lock()
if _, ok := c.hosts[addr]; !ok {
c.hosts[addr] = struct{}{}
go c.fillPool()
toRemove := make(map[string]struct{})
for k := range c.hosts {
toRemove[k] = struct{}{}
}

for _, host := range hosts {
host := host

delete(toRemove, host.Peer)
// we already have it
if _, ok := c.hosts[host.Peer]; ok {
// TODO: Check rack, dc, token range is consistent, trigger topology change
// update stored host
continue
}

c.hosts[host.Peer] = &host
}

// can we hold c.mu whilst iterating this loop?
for addr := range toRemove {
c.removeHostLocked(addr)
}
c.hostMu.Unlock()
}

func (c *SimplePool) RemoveHost(addr string) {
c.hostMu.Lock()
func (c *SimplePool) removeHostLocked(addr string) {
if _, ok := c.hosts[addr]; !ok {
c.hostMu.Unlock()
return
}
delete(c.hosts, addr)
c.hostMu.Unlock()

c.mu.Lock()
defer c.mu.Unlock()
Expand Down
80 changes: 41 additions & 39 deletions host_source.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,61 @@
package gocql

import "time"

type hostInfo struct {
peer string
dataCenter string
rack string
hostId string
tokens []string
import (
"net"
"time"
)

type HostInfo struct {
Peer string
DataCenter string
Rack string
HostId string
Tokens []string
}

// Polls system.peers at a specific interval to find new hosts
type ringDescriber struct {
dcFilter string
rackFilter string
previous []string
previous []HostInfo
session *Session
}

func (r *ringDescriber) GetHosts() []string {
func (r *ringDescriber) GetHosts() []HostInfo {
// we need conn to be the same because we need to query system.peers and system.local
// on the same node to get the whole cluster
conn := r.session.Pool.Pick(nil)
if conn == nil {
return r.previous
}

// TODO: Get conn's tokens form system.local
query := r.session.Query("SELECT peer, data_center, rack, host_id, tokens FROM system.peers")
query := r.session.Query("SELECT data_center, rack, host_id, tokens FROM system.local")
iter := conn.executeQuery(query)

hosts := []string{conn.Address()}
host := hostInfo{}
host := &HostInfo{}
iter.Scan(&host.DataCenter, &host.Rack, &host.HostId, &host.Tokens)

for iter.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens) {
if r.matchFilter(&host) {
// TODO: Capture tokens
hosts = append(hosts, host.peer)
if err := iter.Close(); err != nil {
return r.previous
}

addr, _, err := net.SplitHostPort(conn.Address())
if err != nil {
// this should not happen, ever, as this is the address that was dialed by conn, here
// a panic makes sense, please report a bug if it occurs.
panic(err)
}

host.Peer = addr

hosts := []HostInfo{*host}

query = r.session.Query("SELECT peer, data_center, rack, host_id, tokens FROM system.peers")
iter = conn.executeQuery(query)

for iter.Scan(&host.Peer, &host.DataCenter, &host.Rack, &host.HostId, &host.Tokens) {
if r.matchFilter(host) {
hosts = append(hosts, *host)
}
}

Expand All @@ -49,16 +68,16 @@ func (r *ringDescriber) GetHosts() []string {
return hosts
}

func (r *ringDescriber) matchFilter(host *hostInfo) bool {
func (r *ringDescriber) matchFilter(host *HostInfo) bool {
if r.dcFilter == "" && r.rackFilter == "" {
return true
}

if r.dcFilter != "" && r.dcFilter != host.dataCenter {
if r.dcFilter != "" && r.dcFilter != host.DataCenter {
return false
}

if r.rackFilter != "" && r.rackFilter != host.rack {
if r.rackFilter != "" && r.rackFilter != host.Rack {
return false
}

Expand All @@ -70,29 +89,12 @@ func (h *ringDescriber) run(sleep time.Duration) {
sleep = 30 * time.Second
}

prev := make(map[string]struct{})
for {
// if we have 0 hosts this will return the previous list of hosts to
// attempt to reconnect to the cluster otherwise we would never find
// downed hosts again, could possibly have an optimisation to only
// try to add new hosts if GetHosts didnt error and the hosts didnt change.
hosts := h.GetHosts()
current := make(map[string]struct{})
for _, host := range hosts {
if _, ok := prev[host]; !ok {
h.session.Pool.AddHost(host)
} else {
delete(prev, host)
}

current[host] = struct{}{}
}

for host := range prev {
h.session.Pool.RemoveHost(host)
}

prev = current
h.session.Pool.SetHosts(h.GetHosts())

time.Sleep(sleep)
}
Expand Down

0 comments on commit 358e250

Please sign in to comment.