Skip to content

Commit

Permalink
change HostInfo.Peer to be an IP
Browse files Browse the repository at this point in the history
When Cassandra returns us a hosts info the peer address is defined as an
inet both at protocol for events and the schema for the peer
information.

Previously we stored this as a string, and used it to
connect to hosts and also to index hosts by. This is different to what
use for user supplied address endpoints, we keep the potentially DNS
name as the peer address. This means that we can end up having duplicate
host pools, duplicate host info in the ring.

Fix this by making everything rely on a hosts address being an IP
address instead of either a DNS name or an IP.
  • Loading branch information
Zariel committed Oct 24, 2016
1 parent 48c4691 commit e0a2f2c
Show file tree
Hide file tree
Showing 19 changed files with 367 additions and 332 deletions.
29 changes: 21 additions & 8 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@ func TestRingDiscovery(t *testing.T) {
}

session.pool.mu.RLock()
defer session.pool.mu.RUnlock()
size := len(session.pool.hostConnPools)
session.pool.mu.RUnlock()

if *clusterSize != size {
t.Fatalf("Expected a cluster size of %d, but actual size was %d", *clusterSize, size)
for p, pool := range session.pool.hostConnPools {
t.Logf("p=%q host=%v ips=%s", p, pool.host, pool.host.Peer().String())

}
t.Errorf("Expected a cluster size of %d, but actual size was %d", *clusterSize, size)
}
}

Expand Down Expand Up @@ -573,7 +577,7 @@ func TestReconnection(t *testing.T) {
defer session.Close()

h := session.ring.allHosts()[0]
session.handleNodeDown(net.ParseIP(h.Peer()), h.Port())
session.handleNodeDown(h.Peer(), h.Port())

if h.State() != NodeDown {
t.Fatal("Host should be NodeDown but not.")
Expand Down Expand Up @@ -2477,17 +2481,26 @@ func TestSchemaReset(t *testing.T) {
}

func TestCreateSession_DontSwallowError(t *testing.T) {
t.Skip("This test is bad, and the resultant error from cassandra changes between versions")
cluster := createCluster()
cluster.ProtoVersion = 100
cluster.ProtoVersion = 0x100
session, err := cluster.CreateSession()
if err == nil {
session.Close()

t.Fatal("expected to get an error for unsupported protocol")
}
// TODO: we should get a distinct error type here which include the underlying
// cassandra error about the protocol version, for now check this here.
if !strings.Contains(err.Error(), "Invalid or unsupported protocol version") {
t.Fatalf(`expcted to get error "unsupported protocol version" got: %q`, err)

if flagCassVersion.Major < 3 {
// TODO: we should get a distinct error type here which include the underlying
// cassandra error about the protocol version, for now check this here.
if !strings.Contains(err.Error(), "Invalid or unsupported protocol version") {
t.Fatalf(`expcted to get error "unsupported protocol version" got: %q`, err)
}
} else {
if !strings.Contains(err.Error(), "unsupported response version") {
t.Fatalf(`expcted to get error "unsupported response version" got: %q`, err)
}
}

}
16 changes: 15 additions & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ func (p PoolConfig) buildPool(session *Session) *policyConnPool {
// behavior to fit the most common use cases. Applications that require a
// different setup must implement their own cluster.
type ClusterConfig struct {
Hosts []string // addresses for the initial connections
// addresses for the initial connections. It is recomended to use the value set in
// the Cassandra config for broadcast_address or listen_address, an IP address not
// a domain name. This is because events from Cassandra will use the configured IP
// address, which is used to index connected hosts. If the domain name specified
// resolves to more than 1 IP address then the driver may connect multiple times to
// the same host, and will not mark the node being down or up from events.
Hosts []string
CQLVersion string // CQL version (default: 3.0.0)
ProtoVersion int // version of the native protocol (default: 2)
Timeout time.Duration // connection timeout (default: 600ms)
Expand Down Expand Up @@ -100,6 +106,14 @@ type ClusterConfig struct {
}

// NewCluster generates a new config for the default cluster implementation.
//
// The supplied hosts are used to initially connect to the cluster then the rest of
// the ring will be automatically discovered. It is recomended to use the value set in
// the Cassandra config for broadcast_address or listen_address, an IP address not
// a domain name. This is because events from Cassandra will use the configured IP
// address, which is used to index connected hosts. If the domain name specified
// resolves to more than 1 IP address then the driver may connect multiple times to
// the same host, and will not mark the node being down or up from events.
func NewCluster(hosts ...string) *ClusterConfig {
cfg := &ClusterConfig{
Hosts: hosts,
Expand Down
14 changes: 12 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,15 @@ type Conn struct {
}

// Connect establishes a connection to a Cassandra node.
func Connect(host *HostInfo, addr string, cfg *ConnConfig,
errorHandler ConnErrorHandler, session *Session) (*Conn, error) {
func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, session *Session) (*Conn, error) {
// TODO(zariel): remove these
if host == nil {
panic("host is nil")
} else if len(host.Peer()) == 0 {
panic("host missing peer ip address")
} else if host.Port() == 0 {
panic("host missing port")
}

var (
err error
Expand All @@ -164,6 +171,9 @@ func Connect(host *HostInfo, addr string, cfg *ConnConfig,
Timeout: cfg.Timeout,
}

// TODO(zariel): handle ipv6 zone
addr := (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String()

if cfg.tlsConfig != nil {
// the TLS config is safe to be reused by connections but it must not
// be modified after being used.
Expand Down
14 changes: 10 additions & 4 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,7 @@ func TestStream0(t *testing.T) {
}
})

host := &HostInfo{peer: srv.Address}
conn, err := Connect(host, srv.Address, &ConnConfig{ProtoVersion: int(srv.protocol)}, errorHandler, nil)
conn, err := Connect(srv.host(), &ConnConfig{ProtoVersion: int(srv.protocol)}, errorHandler, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -509,8 +508,7 @@ func TestConnClosedBlocked(t *testing.T) {
t.Log(err)
})

host := &HostInfo{peer: srv.Address}
conn, err := Connect(host, srv.Address, &ConnConfig{ProtoVersion: int(srv.protocol)}, errorHandler, nil)
conn, err := Connect(srv.host(), &ConnConfig{ProtoVersion: int(srv.protocol)}, errorHandler, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -637,6 +635,14 @@ type TestServer struct {
closed bool
}

func (srv *TestServer) host() *HostInfo {
host, err := hostInfo(srv.Address, 9042)
if err != nil {
srv.t.Fatal(err)
}
return host
}

func (srv *TestServer) closeWatch() {
<-srv.ctx.Done()

Expand Down
34 changes: 19 additions & 15 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
// don't create a connection pool for a down host
continue
}
if _, exists := p.hostConnPools[host.Peer()]; exists {
ip := host.Peer().String()
if _, exists := p.hostConnPools[ip]; exists {
// still have this host, so don't remove it
delete(toRemove, host.Peer())
delete(toRemove, ip)
continue
}

Expand All @@ -155,7 +156,7 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
createCount--
if pool.Size() > 0 {
// add pool onyl if there a connections available
p.hostConnPools[pool.host.Peer()] = pool
p.hostConnPools[string(pool.host.Peer())] = pool
}
}

Expand All @@ -177,9 +178,10 @@ func (p *policyConnPool) Size() int {
return count
}

func (p *policyConnPool) getPool(addr string) (pool *hostConnPool, ok bool) {
func (p *policyConnPool) getPool(host *HostInfo) (pool *hostConnPool, ok bool) {
ip := host.Peer().String()
p.mu.RLock()
pool, ok = p.hostConnPools[addr]
pool, ok = p.hostConnPools[ip]
p.mu.RUnlock()
return
}
Expand All @@ -196,8 +198,9 @@ func (p *policyConnPool) Close() {
}

func (p *policyConnPool) addHost(host *HostInfo) {
ip := host.Peer().String()
p.mu.Lock()
pool, ok := p.hostConnPools[host.Peer()]
pool, ok := p.hostConnPools[ip]
if !ok {
pool = newHostConnPool(
p.session,
Expand All @@ -207,22 +210,23 @@ func (p *policyConnPool) addHost(host *HostInfo) {
p.keyspace,
)

p.hostConnPools[host.Peer()] = pool
p.hostConnPools[ip] = pool
}
p.mu.Unlock()

pool.fill()
}

func (p *policyConnPool) removeHost(addr string) {
func (p *policyConnPool) removeHost(ip net.IP) {
k := ip.String()
p.mu.Lock()
pool, ok := p.hostConnPools[addr]
pool, ok := p.hostConnPools[k]
if !ok {
p.mu.Unlock()
return
}

delete(p.hostConnPools, addr)
delete(p.hostConnPools, k)
p.mu.Unlock()

go pool.Close()
Expand All @@ -234,10 +238,10 @@ func (p *policyConnPool) hostUp(host *HostInfo) {
p.addHost(host)
}

func (p *policyConnPool) hostDown(addr string) {
func (p *policyConnPool) hostDown(ip net.IP) {
// TODO(zariel): mark host as down so we can try to connect to it later, for
// now just treat it has removed.
p.removeHost(addr)
p.removeHost(ip)
}

// hostConnPool is a connection pool for a single host.
Expand Down Expand Up @@ -272,7 +276,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int,
session: session,
host: host,
port: port,
addr: JoinHostPort(host.Peer(), port),
addr: (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String(),
size: size,
keyspace: keyspace,
conns: make([]*Conn, 0, size),
Expand Down Expand Up @@ -396,7 +400,7 @@ func (pool *hostConnPool) fill() {

// this is calle with the connetion pool mutex held, this call will
// then recursivly try to lock it again. FIXME
go pool.session.handleNodeDown(net.ParseIP(pool.host.Peer()), pool.port)
go pool.session.handleNodeDown(pool.host.Peer(), pool.port)
return
}

Expand Down Expand Up @@ -477,7 +481,7 @@ func (pool *hostConnPool) connect() (err error) {
// try to connect
var conn *Conn
for i := 0; i < maxAttempts; i++ {
conn, err = pool.session.connect(pool.addr, pool, pool.host)
conn, err = pool.session.connect(pool.host, pool)
if err == nil {
break
}
Expand Down
Loading

0 comments on commit e0a2f2c

Please sign in to comment.