Skip to content

Commit

Permalink
control: handle no hosts in connection pool
Browse files Browse the repository at this point in the history
When the control connection attempts to reconnect and there are no hosts
in the connection pool, fallback to trying the initial endpoints
shuffled again.

Set the host to up after the control connection connects to it.

Simplify control connection reconnecting logic, it is threadsafe as the
only method calling the connect methods is the heartbeater and the
inital connection attempt.
  • Loading branch information
Zariel committed Jan 28, 2016
1 parent 4fd9e05 commit 8159193
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 65 deletions.
138 changes: 73 additions & 65 deletions control.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,39 @@
package gocql

import (
crand "crypto/rand"
"errors"
"fmt"
"log"
"math/rand"
"net"
"strconv"
"sync"
"sync/atomic"
"time"
)

var (
randr *rand.Rand
)

func init() {
b := make([]byte, 4)
if _, err := crand.Read(b); err != nil {
panic(fmt.Sprintf("unable to seed random number generator: %v", err))
}

randr = rand.New(rand.NewSource(int64(readInt(b))))
}

// Ensure that the atomic variable is aligned to a 64bit boundary
// so that atomic operations can be applied on 32bit architectures.
type controlConn struct {
connecting int64

session *Session
conn atomic.Value

retry RetryPolicy

closeWg sync.WaitGroup
started int32
quit chan struct{}
}

Expand All @@ -39,13 +50,17 @@ func createControlConn(session *Session) *controlConn {
}

func (c *controlConn) heartBeat() {
defer c.closeWg.Done()
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return
}

sleepTime := 1 * time.Second

for {
select {
case <-c.quit:
return
case <-time.After(5 * time.Second):
case <-time.After(sleepTime):
}

resp, err := c.writeFrame(&writeOptionsFrame{})
Expand All @@ -55,6 +70,8 @@ func (c *controlConn) heartBeat() {

switch resp.(type) {
case *supportedFrame:
// Everything ok
sleepTime = 5 * time.Second
continue
case error:
goto reconn
Expand All @@ -63,65 +80,79 @@ func (c *controlConn) heartBeat() {
}

reconn:
// try to connect a bit faster
sleepTime = 1 * time.Second
c.reconnect(true)
// time.Sleep(5 * time.Second)
continue
}
}

func (c *controlConn) connect(endpoints []string) error {
// intial connection attmept, try to connect to each endpoint to get an initial
// list of nodes.

// shuffle endpoints so not all drivers will connect to the same initial
// node.
r := rand.New(rand.NewSource(time.Now().UnixNano()))
perm := r.Perm(len(endpoints))
func (c *controlConn) shuffleDial(endpoints []string) (conn *Conn, err error) {
perm := randr.Perm(len(endpoints))
shuffled := make([]string, len(endpoints))

for i, endpoint := range endpoints {
shuffled[perm[i]] = endpoint
}

// store that we are not connected so that reconnect wont happen if we error
atomic.StoreInt64(&c.connecting, -1)

var (
conn *Conn
err error
)

// shuffle endpoints so not all drivers will connect to the same initial
// node.
for _, addr := range shuffled {
conn, err = c.session.connect(JoinHostPort(addr, c.session.cfg.Port), c)
if err != nil {
log.Printf("gocql: unable to control conn dial %v: %v\n", addr, err)
continue
if err == nil {
return
}

if err = c.registerEvents(conn); err != nil {
conn.Close()
continue
}
log.Printf("gocql: unable to control conn dial %v: %v\n", addr, err)
}

// we should fetch the initial ring here and update initial host data. So that
// when we return from here we have a ring topology ready to go.
break
return
}

func (c *controlConn) connect(endpoints []string) error {
conn, err := c.shuffleDial(endpoints)
if err != nil {
return err
} else if conn == nil {
return errors.New("gocql: unable to connect to initial endpoints")
}

if conn == nil {
// this is fatal, not going to connect a session
if err := c.setupConn(conn); err != nil {
conn.Close()
return err
}

c.conn.Store(conn)
atomic.StoreInt64(&c.connecting, 0)
// we could fetch the initial ring here and update initial host data. So that
// when we return from here we have a ring topology ready to go.

c.closeWg.Add(1)
go c.heartBeat()

return nil
}

func (c *controlConn) setupConn(conn *Conn) error {
if err := c.registerEvents(conn); err != nil {
conn.Close()
return err
}

c.conn.Store(conn)

host, portstr, err := net.SplitHostPort(conn.conn.RemoteAddr().String())
if err != nil {
return err
}
port, err := strconv.Atoi(portstr)
if err != nil {
return err
}

go c.session.handleNodeUp(net.ParseIP(host), port, true)

return nil
}

func (c *controlConn) registerEvents(conn *Conn) error {
var events []string

Expand Down Expand Up @@ -159,22 +190,6 @@ func (c *controlConn) registerEvents(conn *Conn) error {
func (c *controlConn) reconnect(refreshring bool) {
// TODO: simplify this function, use session.ring to get hosts instead of the
// connection pool
if !atomic.CompareAndSwapInt64(&c.connecting, 0, 1) {
return
}

success := false
defer func() {
// debounce reconnect a little
if success {
go func() {
time.Sleep(500 * time.Millisecond)
atomic.StoreInt64(&c.connecting, 0)
}()
} else {
atomic.StoreInt64(&c.connecting, 0)
}
}()

addr := c.addr()
oldConn := c.conn.Load().(*Conn)
Expand Down Expand Up @@ -202,10 +217,7 @@ func (c *controlConn) reconnect(refreshring bool) {
if newConn == nil {
_, conn := c.session.pool.Pick(nil)
if conn == nil {
return
}

if conn == nil {
c.connect(c.session.ring.endpoints)
return
}

Expand All @@ -217,16 +229,12 @@ func (c *controlConn) reconnect(refreshring bool) {
}
}

if err := c.registerEvents(newConn); err != nil {
// TODO: handle this case better
if err := c.setupConn(newConn); err != nil {
newConn.Close()
log.Printf("gocql: control unable to register events: %v\n", err)
return
}

c.conn.Store(newConn)
success = true

if refreshring {
c.session.hostSource.refreshRing()
}
Expand Down Expand Up @@ -355,9 +363,9 @@ func (c *controlConn) addr() string {
}

func (c *controlConn) close() {
// TODO: handle more gracefully
close(c.quit)
c.closeWg.Wait()
if atomic.CompareAndSwapInt32(&c.started, 1, -1) {
c.quit <- struct{}{}
}
conn := c.conn.Load().(*Conn)
if conn != nil {
conn.Close()
Expand Down
53 changes: 53 additions & 0 deletions events_ccm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,56 @@ func TestEventFilter(t *testing.T) {
}

}

func TestEventDownQueryable(t *testing.T) {
if err := ccm.AllUp(); err != nil {
t.Fatal(err)
}

status, err := ccm.Status()
if err != nil {
t.Fatal(err)
}
log.Printf("status=%+v\n", status)

const targetNode = "node1"

addr := status[targetNode].Addr

cluster := createCluster()
cluster.Hosts = []string{addr}
cluster.HostFilter = WhiteListHostFilter(addr)
session := createSessionFromCluster(cluster, t)
defer session.Close()

if pool, ok := session.pool.getPool(addr); !ok {
t.Fatalf("should have %v in pool but dont", addr)
} else if !pool.host.IsUp() {
t.Fatalf("host is not up %v", pool.host)
}

if err := ccm.NodeDown(targetNode); err != nil {
t.Fatal(err)
}

time.Sleep(5 * time.Second)

if err := ccm.NodeUp(targetNode); err != nil {
t.Fatal(err)
}

time.Sleep(15 * time.Second)

if pool, ok := session.pool.getPool(addr); !ok {
t.Fatalf("should have %v in pool but dont", addr)
} else if !pool.host.IsUp() {
t.Fatalf("host is not up %v", pool.host)
}

var rows int
if err := session.Query("SELECT COUNT(*) FROM system.local").Scan(&rows); err != nil {
t.Fatal(err)
} else if rows != 1 {
t.Fatalf("expected to get 1 row got %d", rows)
}
}
4 changes: 4 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ func (s *Session) Close() {
if s.nodeEvents != nil {
s.nodeEvents.stop()
}

if s.schemaEvents != nil {
s.schemaEvents.stop()
}
}

func (s *Session) Closed() bool {
Expand Down

0 comments on commit 8159193

Please sign in to comment.