diff --git a/connectionpool.go b/connectionpool.go index 05b2415aa..8afef8730 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -229,7 +229,7 @@ func (p *policyConnPool) addHost(host *HostInfo) { pool = newHostConnPool( p.session, host, - p.port, + host.Port(), p.numConns, p.keyspace, p.connPolicy(), diff --git a/control.go b/control.go index 72949358b..012412a37 100644 --- a/control.go +++ b/control.go @@ -311,7 +311,10 @@ func (c *controlConn) fetchHostInfo(addr net.IP, port int) (*HostInfo, error) { } } - host := &HostInfo{} + host := &HostInfo{ + port: port, + } + if err := fn(host); err != nil { return nil, err } diff --git a/events.go b/events.go index b49545346..9ebf35e0f 100644 --- a/events.go +++ b/events.go @@ -153,14 +153,19 @@ func (s *Session) handleEvent(framer *framer) { func (s *Session) handleNewNode(host net.IP, port int) { // TODO(zariel): need to be able to filter discovered nodes - if s.control == nil { - return - } + log.Printf("new node host=%v port=%v\n", host, port) + + var hostInfo *HostInfo + if s.control != nil { + var err error + hostInfo, err = s.control.fetchHostInfo(host, port) + if err != nil { + log.Printf("gocql: events: unable to fetch host info for %v: %v\n", host, err) + return + } - hostInfo, err := s.control.fetchHostInfo(host, port) - if err != nil { - log.Printf("gocql: unable to fetch host info for %v: %v\n", host, err) - return + } else { + hostInfo = &HostInfo{peer: host.String(), port: port, state: NodeUp} } // should this handle token moving? @@ -170,7 +175,10 @@ func (s *Session) handleNewNode(host net.IP, port int) { } s.pool.addHost(hostInfo) - s.hostSource.refreshRing() + + if s.control != nil { + s.hostSource.refreshRing() + } } func (s *Session) handleRemovedNode(ip net.IP, port int) { diff --git a/events_ccm_test.go b/events_ccm_test.go index 775c348f4..bbcdca906 100644 --- a/events_ccm_test.go +++ b/events_ccm_test.go @@ -61,15 +61,16 @@ func TestEventNodeDownControl(t *testing.T) { time.Sleep(5 * time.Second) session.pool.mu.RLock() - defer session.pool.mu.RUnlock() poolHosts := session.pool.hostConnPools node := status[targetNode] log.Printf("poolhosts=%+v\n", poolHosts) if _, ok := poolHosts[node.Addr]; ok { + session.pool.mu.RUnlock() t.Fatal("node not removed after remove event") } + session.pool.mu.RUnlock() } func TestEventNodeDown(t *testing.T) { diff --git a/host_source.go b/host_source.go index 05226164b..17bfab36d 100644 --- a/host_source.go +++ b/host_source.go @@ -62,6 +62,7 @@ type HostInfo struct { // that we are thread safe use a mutex to access all fields. mu sync.RWMutex peer string + port int dataCenter string rack string hostId string @@ -172,6 +173,19 @@ func (h *HostInfo) setTokens(tokens []string) *HostInfo { return h } +func (h *HostInfo) Port() int { + h.mu.RLock() + defer h.mu.RUnlock() + return h.port +} + +func (h *HostInfo) setPort(port int) *HostInfo { + h.mu.Lock() + defer h.mu.Unlock() + h.port = port + return h +} + func (h *HostInfo) update(from *HostInfo) { h.mu.Lock() defer h.mu.Unlock() @@ -186,10 +200,10 @@ func (h *HostInfo) IsUp() bool { return h.State() == NodeUp } -func (h HostInfo) String() string { +func (h *HostInfo) String() string { h.mu.RLock() defer h.mu.RUnlock() - return fmt.Sprintf("[hostinfo peer=%q data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens)) + return fmt.Sprintf("[hostinfo peer=%q port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens)) } // Polls system.peers at a specific interval to find new hosts diff --git a/session.go b/session.go index 4d8c02c1a..92fe41fe8 100644 --- a/session.go +++ b/session.go @@ -10,6 +10,9 @@ import ( "errors" "fmt" "io" + "log" + "net" + "strconv" "strings" "sync" "time" @@ -96,30 +99,48 @@ func NewSession(cfg ClusterConfig) (*Session, error) { } s.pool = cfg.PoolConfig.buildPool(s) + + var hosts []*HostInfo + if !cfg.disableControlConn { s.control = createControlConn(s) if err := s.control.connect(cfg.Hosts); err != nil { + s.Close() return nil, err } // need to setup host source to check for broadcast_address in system.local localHasRPCAddr, _ := checkSystemLocal(s.control) s.hostSource.localHasRpcAddr = localHasRPCAddr - hosts, _, err := s.hostSource.GetHosts() + hosts, _, err = s.hostSource.GetHosts() if err != nil { - s.control.close() + s.Close() return nil, err } - // TODO(zariel): this should be used to create initial metadata - s.pool.SetHosts(hosts) } else { // we dont get host info - hosts := make([]*HostInfo, len(cfg.Hosts)) - for i, addr := range cfg.Hosts { - hosts[i] = &HostInfo{peer: addr} + hosts = make([]*HostInfo, len(cfg.Hosts)) + for i, hostport := range cfg.Hosts { + // TODO: remove duplication + addr, portStr, err := net.SplitHostPort(JoinHostPort(hostport, cfg.Port)) + if err != nil { + s.Close() + return nil, fmt.Errorf("NewSession: unable to parse hostport of addr %q: %v", hostport, err) + } + + port, err := strconv.Atoi(portStr) + if err != nil { + s.Close() + return nil, fmt.Errorf("NewSession: invalid port for hostport of addr %q: %v", hostport, err) + } + + hosts[i] = &HostInfo{peer: addr, port: port, state: NodeUp} } - s.pool.SetHosts(hosts) + } + + for _, host := range hosts { + s.handleNodeUp(net.ParseIP(host.Peer()), host.Port()) } // TODO(zariel): we probably dont need this any more as we verify that we