Skip to content

Commit

Permalink
register for schema events
Browse files Browse the repository at this point in the history
Register for schema change events and clear the prepared statement cache
so we don't use stale schema information for queries.
  • Loading branch information
Zariel committed Jan 26, 2016
1 parent 83f9932 commit d8ea9d6
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 29 deletions.
2 changes: 2 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ type ClusterConfig struct {
DisableNodeStatusEvents bool
// disable registering for topology events (node added/removed/moved)
DisableTopologyEvents bool
// disable registering for schema events (keyspace/table/function removed/created/updated)
DisableSchemaEvents bool
}

// internal config for testing
Expand Down
3 changes: 3 additions & 0 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func (c *controlConn) registerEvents(conn *Conn) error {
if !c.session.cfg.Events.DisableNodeStatusEvents {
events = append(events, "STATUS_CHANGE")
}
if !c.session.cfg.Events.DisableSchemaEvents {
events = append(events, "SCHEMA_CHANGE")
}

if len(events) == 0 {
return nil
Expand Down
59 changes: 31 additions & 28 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,37 @@ func (e *eventDeouncer) debounce(frame frame) {
e.mu.Unlock()
}

func (s *Session) handleEvent(framer *framer) {
// TODO(zariel): need to debounce events frames, and possible also events
defer framerPool.Put(framer)

frame, err := framer.parseFrame()
if err != nil {
// TODO: logger
log.Printf("gocql: unable to parse event frame: %v\n", err)
return
}

if debug {
log.Printf("gocql: handling frame: %v\n", frame)
}

// TODO: handle medatadata events
switch f := frame.(type) {
case *schemaChangeKeyspace, *schemaChangeFunction, *schemaChangeTable:
s.schemaEvents.debounce(frame)
case *topologyChangeEventFrame, *statusChangeEventFrame:
s.nodeEvents.debounce(frame)
default:
log.Printf("gocql: invalid event frame (%T): %v\n", f, f)
}
}

func (s *Session) handleSchemaEvent(frames []frame) {
// for now we dont care about them, just reset the prepared statements
s.stmtsLRU.clear()
}

func (s *Session) handleNodeEvent(frames []frame) {
type nodeEvent struct {
change string
Expand Down Expand Up @@ -131,34 +162,6 @@ func (s *Session) handleNodeEvent(frames []frame) {
}
}

func (s *Session) handleEvent(framer *framer) {
// TODO(zariel): need to debounce events frames, and possible also events
defer framerPool.Put(framer)

frame, err := framer.parseFrame()
if err != nil {
// TODO: logger
log.Printf("gocql: unable to parse event frame: %v\n", err)
return
}

if debug {
log.Printf("gocql: handling frame: %v\n", frame)
}

// TODO: handle medatadata events
switch f := frame.(type) {
case *schemaChangeKeyspace:
case *schemaChangeFunction:
case *schemaChangeTable:
case *topologyChangeEventFrame, *statusChangeEventFrame:
s.nodeEvents.debounce(frame)
default:
log.Printf("gocql: invalid event frame (%T): %v\n", f, f)
}

}

func (s *Session) handleNewNode(host net.IP, port int, waitForBinary bool) {
// TODO(zariel): need to be able to filter discovered nodes

Expand Down
4 changes: 3 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type Session struct {
control *controlConn

// event handlers
nodeEvents *eventDeouncer
nodeEvents *eventDeouncer
schemaEvents *eventDeouncer

// ring metadata
hosts []HostInfo
Expand Down Expand Up @@ -102,6 +103,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
s.connCfg = connCfg

s.nodeEvents = newEventDeouncer("NodeEvents", s.handleNodeEvent)
s.schemaEvents = newEventDeouncer("SchemaEvents", s.handleSchemaEvent)

s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo)

Expand Down

0 comments on commit d8ea9d6

Please sign in to comment.