Skip to content

Commit

Permalink
Return host status to the host selection policy
Browse files Browse the repository at this point in the history
The connection pool Pick function has been updated to return a
"SelectedHost" which contains helper methods for marking the status
of a host.
  • Loading branch information
dancannon committed Oct 27, 2015
1 parent 9935df5 commit 4c27e00
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 38 deletions.
8 changes: 4 additions & 4 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ func (p *policyConnPool) Size() int {
return count
}

func (p *policyConnPool) Pick(qry *Query) *Conn {
func (p *policyConnPool) Pick(qry *Query) (SelectedHost, *Conn) {
nextHost := p.hostPolicy.Pick(qry)

var (
host *HostInfo
host SelectedHost
conn *Conn
)

Expand All @@ -185,10 +185,10 @@ func (p *policyConnPool) Pick(qry *Query) *Conn {
if host == nil {
break
}
conn = p.hostConnPools[host.Peer].Pick(qry)
conn = p.hostConnPools[host.Info().Peer].Pick(qry)
}
p.mu.RUnlock()
return conn
return host, conn
}

func (p *policyConnPool) Close() {
Expand Down
4 changes: 3 additions & 1 deletion control.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,19 @@ func (c *controlConn) reconnect(refreshring bool) {

// TODO: should have our own roundrobbin for hosts so that we can try each
// in succession and guantee that we get a different host each time.
conn := c.session.pool.Pick(nil)
host, conn := c.session.pool.Pick(nil)
if conn == nil {
return
}

newConn, err := Connect(conn.addr, conn.cfg, c)
if err != nil {
host.Mark(err)
// TODO: add log handler for things like this
return
}

host.Mark(nil)
c.conn.Store(newConn)
success = true

Expand Down
47 changes: 41 additions & 6 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,15 @@ type HostSelectionPolicy interface {
Pick(*Query) NextHost
}

// SelectedHost is an interface returned when picking a host from a host
// selection policy.
type SelectedHost interface {
Info() *HostInfo
Mark(error)
}

// NextHost is an iteration function over picked hosts
type NextHost func() *HostInfo
type NextHost func() SelectedHost

// RoundRobinHostPolicy is a round-robin load balancing policy, where each host
// is tried sequentially for each query.
Expand Down Expand Up @@ -86,7 +93,7 @@ func (r *roundRobinHostPolicy) Pick(qry *Query) NextHost {
// i is used to limit the number of attempts to find a host
// to the number of hosts known to this policy
var i uint32 = 0
return func() *HostInfo {
return func() SelectedHost {
r.mu.RLock()
if len(r.hosts) == 0 {
r.mu.RUnlock()
Expand All @@ -102,10 +109,24 @@ func (r *roundRobinHostPolicy) Pick(qry *Query) NextHost {
i++
}
r.mu.RUnlock()
return host
return selectedRoundRobinHost{host}
}
}

// selectedRoundRobinHost is a host returned by the roundRobinHostPolicy and
// implements the SelectedHost interface
type selectedRoundRobinHost struct {
info *HostInfo
}

func (host selectedRoundRobinHost) Info() *HostInfo {
return host.info
}

func (host selectedRoundRobinHost) Mark(err error) {
// noop
}

// TokenAwareHostPolicy is a token aware host selection policy, where hosts are
// selected based on the partition key, so queries are sent to the host which
// owns the partition. Fallback is used when routing information is not available.
Expand Down Expand Up @@ -195,10 +216,10 @@ func (t *tokenAwareHostPolicy) Pick(qry *Query) NextHost {
hostReturned bool
fallbackIter NextHost
)
return func() *HostInfo {
return func() SelectedHost {
if !hostReturned {
hostReturned = true
return host
return selectedTokenAwareHost{host}
}

// fallback
Expand All @@ -209,14 +230,28 @@ func (t *tokenAwareHostPolicy) Pick(qry *Query) NextHost {
fallbackHost := fallbackIter()

// filter the token aware selected hosts from the fallback hosts
if fallbackHost == host {
if fallbackHost.Info() == host {
fallbackHost = fallbackIter()
}

return fallbackHost
}
}

// selectedTokenAwareHost is a host returned by the tokenAwareHostPolicy and
// implements the SelectedHost interface
type selectedTokenAwareHost struct {
info *HostInfo
}

func (host selectedTokenAwareHost) Info() *HostInfo {
return host.info
}

func (host selectedTokenAwareHost) Mark(err error) {
// noop
}

//ConnSelectionPolicy is an interface for selecting an
//appropriate connection for executing a query
type ConnSelectionPolicy interface {
Expand Down
48 changes: 24 additions & 24 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,26 @@ func TestRoundRobinHostPolicy(t *testing.T) {
// the first host selected is actually at [1], but this is ok for RR
// interleaved iteration should always increment the host
iterA := policy.Pick(nil)
if actual := iterA(); actual != &hosts[1] {
t.Errorf("Expected hosts[1] but was hosts[%s]", actual.HostId)
if actual := iterA(); actual.Info() != &hosts[1] {
t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostId)
}
iterB := policy.Pick(nil)
if actual := iterB(); actual != &hosts[0] {
t.Errorf("Expected hosts[0] but was hosts[%s]", actual.HostId)
if actual := iterB(); actual.Info() != &hosts[0] {
t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostId)
}
if actual := iterB(); actual != &hosts[1] {
t.Errorf("Expected hosts[1] but was hosts[%s]", actual.HostId)
if actual := iterB(); actual.Info() != &hosts[1] {
t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostId)
}
if actual := iterA(); actual != &hosts[0] {
t.Errorf("Expected hosts[0] but was hosts[%s]", actual.HostId)
if actual := iterA(); actual.Info() != &hosts[0] {
t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostId)
}

iterC := policy.Pick(nil)
if actual := iterC(); actual != &hosts[1] {
t.Errorf("Expected hosts[1] but was hosts[%s]", actual.HostId)
if actual := iterC(); actual.Info() != &hosts[1] {
t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostId)
}
if actual := iterC(); actual != &hosts[0] {
t.Errorf("Expected hosts[0] but was hosts[%s]", actual.HostId)
if actual := iterC(); actual.Info() != &hosts[0] {
t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostId)
}
}

Expand Down Expand Up @@ -70,32 +70,32 @@ func TestTokenAwareHostPolicy(t *testing.T) {

// the token ring is not setup without the partitioner, but the fallback
// should work
if actual := policy.Pick(nil)(); actual.Peer != "1" {
t.Errorf("Expected peer 1 but was %s", actual.Peer)
if actual := policy.Pick(nil)(); actual.Info().Peer != "1" {
t.Errorf("Expected peer 1 but was %s", actual.Info().Peer)
}

query.RoutingKey([]byte("30"))
if actual := policy.Pick(query)(); actual.Peer != "2" {
t.Errorf("Expected peer 2 but was %s", actual.Peer)
if actual := policy.Pick(query)(); actual.Info().Peer != "2" {
t.Errorf("Expected peer 2 but was %s", actual.Info().Peer)
}

policy.SetPartitioner("OrderedPartitioner")

// now the token ring is configured
query.RoutingKey([]byte("20"))
iter = policy.Pick(query)
if actual := iter(); actual.Peer != "1" {
t.Errorf("Expected peer 1 but was %s", actual.Peer)
if actual := iter(); actual.Info().Peer != "1" {
t.Errorf("Expected peer 1 but was %s", actual.Info().Peer)
}
// rest are round robin
if actual := iter(); actual.Peer != "3" {
t.Errorf("Expected peer 3 but was %s", actual.Peer)
if actual := iter(); actual.Info().Peer != "3" {
t.Errorf("Expected peer 3 but was %s", actual.Info().Peer)
}
if actual := iter(); actual.Peer != "0" {
t.Errorf("Expected peer 0 but was %s", actual.Peer)
if actual := iter(); actual.Info().Peer != "0" {
t.Errorf("Expected peer 0 but was %s", actual.Info().Peer)
}
if actual := iter(); actual.Peer != "2" {
t.Errorf("Expected peer 2 but was %s", actual.Peer)
if actual := iter(); actual.Info().Peer != "2" {
t.Errorf("Expected peer 2 but was %s", actual.Info().Peer)
}
}

Expand Down
18 changes: 15 additions & 3 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (s *Session) executeQuery(qry *Query) *Iter {
qry.attempts = 0
qry.totalLatency = 0
for {
conn := s.pool.Pick(qry)
host, conn := s.pool.Pick(qry)

//Assign the error unavailable to the iterator
if conn == nil {
Expand All @@ -254,9 +254,13 @@ func (s *Session) executeQuery(qry *Query) *Iter {

//Exit for loop if the query was successful
if iter.err == nil {
host.Mark(iter.err)
break
}

// Mark host as ok
host.Mark(nil)

if qry.rt == nil || !qry.rt.Attempt(qry) {
break
}
Expand Down Expand Up @@ -323,7 +327,7 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
)

// get the query info for the statement
conn := s.pool.Pick(nil)
host, conn := s.pool.Pick(nil)
if conn == nil {
// no connections
inflight.err = ErrNoConnections
Expand All @@ -336,9 +340,13 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
if inflight.err != nil {
// don't cache this error
s.routingKeyInfoCache.Remove(stmt)
host.Mark(inflight.err)
return nil, inflight.err
}

// Mark host as OK
host.Mark(nil)

if len(info.Args) == 0 {
// no arguments, no routing key, and no error
return nil, nil
Expand Down Expand Up @@ -418,7 +426,7 @@ func (s *Session) executeBatch(batch *Batch) (*Iter, error) {
batch.attempts = 0
batch.totalLatency = 0
for {
conn := s.pool.Pick(nil)
host, conn := s.pool.Pick(nil)

//Assign the error unavailable and break loop
if conn == nil {
Expand All @@ -431,9 +439,13 @@ func (s *Session) executeBatch(batch *Batch) (*Iter, error) {
batch.attempts++
//Exit loop if operation executed correctly
if err == nil {
host.Mark(err)
return iter, err
}

// Mark host as OK
host.Mark(nil)

if batch.rt == nil || !batch.rt.Attempt(batch) {
break
}
Expand Down

0 comments on commit 4c27e00

Please sign in to comment.