Skip to content

Commit

Permalink
use the port to connect to which is passed via the events
Browse files Browse the repository at this point in the history
  • Loading branch information
Zariel committed Dec 25, 2015
1 parent 6c44ff9 commit e824767
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 21 deletions.
2 changes: 1 addition & 1 deletion connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (p *policyConnPool) addHost(host *HostInfo) {
pool = newHostConnPool(
p.session,
host,
p.port,
host.Port(),
p.numConns,
p.keyspace,
p.connPolicy(),
Expand Down
5 changes: 4 additions & 1 deletion control.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,10 @@ func (c *controlConn) fetchHostInfo(addr net.IP, port int) (*HostInfo, error) {
}
}

host := &HostInfo{}
host := &HostInfo{
port: port,
}

if err := fn(host); err != nil {
return nil, err
}
Expand Down
24 changes: 16 additions & 8 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,19 @@ func (s *Session) handleEvent(framer *framer) {

func (s *Session) handleNewNode(host net.IP, port int) {
// TODO(zariel): need to be able to filter discovered nodes
if s.control == nil {
return
}
log.Printf("new node host=%v port=%v\n", host, port)

var hostInfo *HostInfo
if s.control != nil {
var err error
hostInfo, err = s.control.fetchHostInfo(host, port)
if err != nil {
log.Printf("gocql: events: unable to fetch host info for %v: %v\n", host, err)
return
}

hostInfo, err := s.control.fetchHostInfo(host, port)
if err != nil {
log.Printf("gocql: unable to fetch host info for %v: %v\n", host, err)
return
} else {
hostInfo = &HostInfo{peer: host.String(), port: port, state: NodeUp}
}

// should this handle token moving?
Expand All @@ -170,7 +175,10 @@ func (s *Session) handleNewNode(host net.IP, port int) {
}

s.pool.addHost(hostInfo)
s.hostSource.refreshRing()

if s.control != nil {
s.hostSource.refreshRing()
}
}

func (s *Session) handleRemovedNode(ip net.IP, port int) {
Expand Down
3 changes: 2 additions & 1 deletion events_ccm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,16 @@ func TestEventNodeDownControl(t *testing.T) {
time.Sleep(5 * time.Second)

session.pool.mu.RLock()
defer session.pool.mu.RUnlock()

poolHosts := session.pool.hostConnPools
node := status[targetNode]
log.Printf("poolhosts=%+v\n", poolHosts)

if _, ok := poolHosts[node.Addr]; ok {
session.pool.mu.RUnlock()
t.Fatal("node not removed after remove event")
}
session.pool.mu.RUnlock()
}

func TestEventNodeDown(t *testing.T) {
Expand Down
18 changes: 16 additions & 2 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type HostInfo struct {
// that we are thread safe use a mutex to access all fields.
mu sync.RWMutex
peer string
port int
dataCenter string
rack string
hostId string
Expand Down Expand Up @@ -172,6 +173,19 @@ func (h *HostInfo) setTokens(tokens []string) *HostInfo {
return h
}

func (h *HostInfo) Port() int {
h.mu.RLock()
defer h.mu.RUnlock()
return h.port
}

func (h *HostInfo) setPort(port int) *HostInfo {
h.mu.Lock()
defer h.mu.Unlock()
h.port = port
return h
}

func (h *HostInfo) update(from *HostInfo) {
h.mu.Lock()
defer h.mu.Unlock()
Expand All @@ -186,10 +200,10 @@ func (h *HostInfo) IsUp() bool {
return h.State() == NodeUp
}

func (h HostInfo) String() string {
func (h *HostInfo) String() string {
h.mu.RLock()
defer h.mu.RUnlock()
return fmt.Sprintf("[hostinfo peer=%q data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
return fmt.Sprintf("[hostinfo peer=%q port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
}

// Polls system.peers at a specific interval to find new hosts
Expand Down
37 changes: 29 additions & 8 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"errors"
"fmt"
"io"
"log"
"net"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -96,30 +99,48 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
}

s.pool = cfg.PoolConfig.buildPool(s)

var hosts []*HostInfo

if !cfg.disableControlConn {
s.control = createControlConn(s)
if err := s.control.connect(cfg.Hosts); err != nil {
s.Close()
return nil, err
}

// need to setup host source to check for broadcast_address in system.local
localHasRPCAddr, _ := checkSystemLocal(s.control)
s.hostSource.localHasRpcAddr = localHasRPCAddr
hosts, _, err := s.hostSource.GetHosts()
hosts, _, err = s.hostSource.GetHosts()
if err != nil {
s.control.close()
s.Close()
return nil, err
}

// TODO(zariel): this should be used to create initial metadata
s.pool.SetHosts(hosts)
} else {
// we dont get host info
hosts := make([]*HostInfo, len(cfg.Hosts))
for i, addr := range cfg.Hosts {
hosts[i] = &HostInfo{peer: addr}
hosts = make([]*HostInfo, len(cfg.Hosts))
for i, hostport := range cfg.Hosts {
// TODO: remove duplication
addr, portStr, err := net.SplitHostPort(JoinHostPort(hostport, cfg.Port))
if err != nil {
s.Close()
return nil, fmt.Errorf("NewSession: unable to parse hostport of addr %q: %v", hostport, err)
}

port, err := strconv.Atoi(portStr)
if err != nil {
s.Close()
return nil, fmt.Errorf("NewSession: invalid port for hostport of addr %q: %v", hostport, err)
}

hosts[i] = &HostInfo{peer: addr, port: port, state: NodeUp}
}
s.pool.SetHosts(hosts)
}

for _, host := range hosts {
s.handleNodeUp(net.ParseIP(host.Peer()), host.Port())
}

// TODO(zariel): we probably dont need this any more as we verify that we
Expand Down

0 comments on commit e824767

Please sign in to comment.