Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always refresh ring on topology changes and reconnections #1680

Merged
merged 7 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make GetHosts() query system.local
- remove Conn.localHostInfo() and add Session.hostInfoFromIter()
- add ringDescriber.getLocalHostInfo()
- remove local host from ringDescriber.getClusterPeerInfo() returned slice
- make ringDescriber.GetHosts() call ringDescriber.getLocalHostInfo() in addition to ringDescriber.getClusterPeerInfo()
  • Loading branch information
joao-r-reis committed Apr 11, 2023
commit 937f39c5e9b961335535ea4cfeb7b9e5f19b6e9b
21 changes: 4 additions & 17 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,6 +1651,10 @@ func (c *Conn) querySystemPeers(ctx context.Context, version cassVersion) *Iter
}
}

func (c *Conn) querySystemLocal(ctx context.Context) *Iter {
return c.query(ctx, "SELECT * FROM system.local WHERE key='local'")
}

func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
const localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"

Expand Down Expand Up @@ -1721,23 +1725,6 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", schemas)
}

func (c *Conn) localHostInfo(ctx context.Context) (*HostInfo, error) {
row, err := c.query(ctx, "SELECT * FROM system.local WHERE key='local'").rowMap()
if err != nil {
return nil, err
}

port := c.conn.RemoteAddr().(*net.TCPAddr).Port

// TODO(zariel): avoid doing this here
host, err := c.session.hostInfoFromMap(row, &HostInfo{connectAddress: c.host.connectAddress, port: port})
if err != nil {
return nil, err
}

return c.session.ring.addOrUpdate(host), nil
}

var (
ErrQueryArgLength = errors.New("gocql: query argument length mismatch")
ErrTimeoutNoResponse = errors.New("gocql: no response received from cassandra within timeout period")
Expand Down
5 changes: 4 additions & 1 deletion control.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,14 @@ type connHost struct {

func (c *controlConn) setupConn(conn *Conn) error {
// we need up-to-date host info for the filterHost call below
host, err := conn.localHostInfo(context.TODO())
iter := conn.querySystemLocal(context.TODO())
host, err := c.session.hostInfoFromIter(iter, conn.host.connectAddress, conn.conn.RemoteAddr().(*net.TCPAddr).Port)
if err != nil {
return err
}

host = c.session.ring.addOrUpdate(host)

if c.session.cfg.filterHost(host) {
return fmt.Errorf("host was filtered: %v", host.ConnectAddress())
}
Expand Down
62 changes: 55 additions & 7 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,12 +559,54 @@ func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (*
return host, nil
}

func (s *Session) hostInfoFromIter(iter *Iter, connectAddress net.IP, defaultPort int) (*HostInfo, error) {
rows, err := iter.SliceMap()
if err != nil {
// TODO(zariel): make typed error
return nil, err
}

if len(rows) == 0 {
return nil, errors.New("query returned 0 rows")
}

host, err := s.hostInfoFromMap(rows[0], &HostInfo{connectAddress: connectAddress, port: defaultPort})
if err != nil {
return nil, err
}
return host, nil
}

// Ask the control node for the local host info
func (r *ringDescriber) getLocalHostInfo() (*HostInfo, error) {
if r.session.control == nil {
return nil, errNoControl
}

iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
return ch.conn.querySystemLocal(context.TODO())
})

if iter == nil {
return nil, errNoControl
}

host, err := r.session.hostInfoFromIter(iter, nil, r.session.cfg.Port)
if err != nil {
return nil, fmt.Errorf("could not retrieve local host info: %w", err)
}
return host, nil
}

// Ask the control node for host info on all it's known peers
func (r *ringDescriber) getClusterPeerInfo() ([]*HostInfo, error) {
var hosts []*HostInfo
func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, error) {
if r.session.control == nil {
return nil, errNoControl
}

var peers []*HostInfo
iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
hosts = append(hosts, ch.host)
return ch.conn.querySystemPeers(context.TODO(), ch.host.version)
return ch.conn.querySystemPeers(context.TODO(), localHost.version)
})

if iter == nil {
Expand All @@ -589,10 +631,10 @@ func (r *ringDescriber) getClusterPeerInfo() ([]*HostInfo, error) {
continue
}

hosts = append(hosts, host)
peers = append(peers, host)
}

return hosts, nil
return peers, nil
}

// Return true if the host is a valid peer
Expand All @@ -609,11 +651,17 @@ func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
r.mu.Lock()
defer r.mu.Unlock()

hosts, err := r.getClusterPeerInfo()
localHost, err := r.getLocalHostInfo()
if err != nil {
return r.prevHosts, r.prevPartitioner, err
}

peerHosts, err := r.getClusterPeerInfo(localHost)
if err != nil {
return r.prevHosts, r.prevPartitioner, err
}

hosts := append([]*HostInfo{localHost}, peerHosts...)
var partitioner string
if len(hosts) > 0 {
partitioner = hosts[0].Partitioner()
Expand Down