Skip to content

Commit

Permalink
control: register event frames
Browse files Browse the repository at this point in the history
  • Loading branch information
Zariel committed Dec 25, 2015
1 parent ba1fc9f commit 91a3b24
Showing 1 changed file with 32 additions and 11 deletions.
43 changes: 32 additions & 11 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ type controlConn struct {

conn atomic.Value

retry RetryPolicy
retry RetryPolicy
connCfg *ConnConfig

closeWg sync.WaitGroup
quit chan struct{}
Expand Down Expand Up @@ -87,6 +88,7 @@ func (c *controlConn) connect(endpoints []string) error {
if err != nil {
return err
}
c.connCfg = connCfg

// store that we are not connected so that reconnect wont happen if we error
atomic.StoreInt64(&c.connecting, -1)
Expand All @@ -102,6 +104,11 @@ func (c *controlConn) connect(endpoints []string) error {
continue
}

if err = c.registerEvents(conn); err != nil {
conn.Close()
continue
}

// we should fetch the initial ring here and update initial host data. So that
// when we return from here we have a ring topology ready to go.
break
Expand All @@ -120,6 +127,24 @@ func (c *controlConn) connect(endpoints []string) error {
return nil
}

func (c *controlConn) registerEvents(conn *Conn) error {
framer, err := conn.exec(&writeRegisterFrame{
events: []string{"TOPOLOGY_CHANGE", "STATUS_CHANGE", "STATUS_CHANGE"},
}, nil)
if err != nil {
return err
}

frame, err := framer.parseFrame()
if err != nil {
return err
} else if _, ok := frame.(*readyFrame); !ok {
return fmt.Errorf("unexpected frame in response to register: got %T: %v\n", frame, frame)
}

return nil
}

func (c *controlConn) reconnect(refreshring bool) {
if !atomic.CompareAndSwapInt64(&c.connecting, 0, 1) {
return
Expand Down Expand Up @@ -147,27 +172,23 @@ func (c *controlConn) reconnect(refreshring bool) {
return
}

newConn, err := Connect(conn.addr, conn.cfg, c, c.session)
newConn, err := Connect(conn.addr, c.connCfg, c, c.session)
if err != nil {
host.Mark(err)
// TODO: add log handler for things like this
return
}

frame, err := c.writeFrame(&writeRegisterFrame{
events: []string{"TOPOLOGY_CHANGE", "STATUS_CHANGE", "STATUS_CHANGE"},
})

if err != nil {
if err := c.registerEvents(newConn); err != nil {
host.Mark(err)
return
} else if _, ok := frame.(*readyFrame); !ok {
log.Printf("gocql: unexpected frame in response to register: got %T: %v\n", frame, frame)
// TODO: handle this case better
newConn.Close()
log.Printf("gocql: unable to register events: %v\n", err)
return
}

host.Mark(nil)
c.conn.Store(newConn)
host.Mark(nil)
success = true

if oldConn != nil {
Expand Down

0 comments on commit 91a3b24

Please sign in to comment.