From 4c27e0075d785d9412fddfe18aefc919b68eb7ec Mon Sep 17 00:00:00 2001 From: Daniel Cannon Date: Tue, 20 Oct 2015 16:57:43 +0100 Subject: [PATCH] Return host status to the host selection policy The connection pool Pick function has been updated to return a "SelectedHost" which contains helper methods for marking the status of a host. --- connectionpool.go | 8 ++++---- control.go | 4 +++- policies.go | 47 ++++++++++++++++++++++++++++++++++++++++------ policies_test.go | 48 +++++++++++++++++++++++------------------------ session.go | 18 +++++++++++++++--- 5 files changed, 87 insertions(+), 38 deletions(-) diff --git a/connectionpool.go b/connectionpool.go index 455f8c8f7..0e96a9328 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -171,11 +171,11 @@ func (p *policyConnPool) Size() int { return count } -func (p *policyConnPool) Pick(qry *Query) *Conn { +func (p *policyConnPool) Pick(qry *Query) (SelectedHost, *Conn) { nextHost := p.hostPolicy.Pick(qry) var ( - host *HostInfo + host SelectedHost conn *Conn ) @@ -185,10 +185,10 @@ func (p *policyConnPool) Pick(qry *Query) *Conn { if host == nil { break } - conn = p.hostConnPools[host.Peer].Pick(qry) + conn = p.hostConnPools[host.Info().Peer].Pick(qry) } p.mu.RUnlock() - return conn + return host, conn } func (p *policyConnPool) Close() { diff --git a/control.go b/control.go index f894eb4a9..07a08334d 100644 --- a/control.go +++ b/control.go @@ -83,17 +83,19 @@ func (c *controlConn) reconnect(refreshring bool) { // TODO: should have our own roundrobbin for hosts so that we can try each // in succession and guantee that we get a different host each time. - conn := c.session.pool.Pick(nil) + host, conn := c.session.pool.Pick(nil) if conn == nil { return } newConn, err := Connect(conn.addr, conn.cfg, c) if err != nil { + host.Mark(err) // TODO: add log handler for things like this return } + host.Mark(nil) c.conn.Store(newConn) success = true diff --git a/policies.go b/policies.go index ed3e606cb..7b17cd5fa 100644 --- a/policies.go +++ b/policies.go @@ -57,8 +57,15 @@ type HostSelectionPolicy interface { Pick(*Query) NextHost } +// SelectedHost is an interface returned when picking a host from a host +// selection policy. +type SelectedHost interface { + Info() *HostInfo + Mark(error) +} + // NextHost is an iteration function over picked hosts -type NextHost func() *HostInfo +type NextHost func() SelectedHost // RoundRobinHostPolicy is a round-robin load balancing policy, where each host // is tried sequentially for each query. @@ -86,7 +93,7 @@ func (r *roundRobinHostPolicy) Pick(qry *Query) NextHost { // i is used to limit the number of attempts to find a host // to the number of hosts known to this policy var i uint32 = 0 - return func() *HostInfo { + return func() SelectedHost { r.mu.RLock() if len(r.hosts) == 0 { r.mu.RUnlock() @@ -102,10 +109,24 @@ func (r *roundRobinHostPolicy) Pick(qry *Query) NextHost { i++ } r.mu.RUnlock() - return host + return selectedRoundRobinHost{host} } } +// selectedRoundRobinHost is a host returned by the roundRobinHostPolicy and +// implements the SelectedHost interface +type selectedRoundRobinHost struct { + info *HostInfo +} + +func (host selectedRoundRobinHost) Info() *HostInfo { + return host.info +} + +func (host selectedRoundRobinHost) Mark(err error) { + // noop +} + // TokenAwareHostPolicy is a token aware host selection policy, where hosts are // selected based on the partition key, so queries are sent to the host which // owns the partition. Fallback is used when routing information is not available. @@ -195,10 +216,10 @@ func (t *tokenAwareHostPolicy) Pick(qry *Query) NextHost { hostReturned bool fallbackIter NextHost ) - return func() *HostInfo { + return func() SelectedHost { if !hostReturned { hostReturned = true - return host + return selectedTokenAwareHost{host} } // fallback @@ -209,7 +230,7 @@ func (t *tokenAwareHostPolicy) Pick(qry *Query) NextHost { fallbackHost := fallbackIter() // filter the token aware selected hosts from the fallback hosts - if fallbackHost == host { + if fallbackHost.Info() == host { fallbackHost = fallbackIter() } @@ -217,6 +238,20 @@ func (t *tokenAwareHostPolicy) Pick(qry *Query) NextHost { } } +// selectedTokenAwareHost is a host returned by the tokenAwareHostPolicy and +// implements the SelectedHost interface +type selectedTokenAwareHost struct { + info *HostInfo +} + +func (host selectedTokenAwareHost) Info() *HostInfo { + return host.info +} + +func (host selectedTokenAwareHost) Mark(err error) { + // noop +} + //ConnSelectionPolicy is an interface for selecting an //appropriate connection for executing a query type ConnSelectionPolicy interface { diff --git a/policies_test.go b/policies_test.go index f7c54c485..b6dc296df 100644 --- a/policies_test.go +++ b/policies_test.go @@ -20,26 +20,26 @@ func TestRoundRobinHostPolicy(t *testing.T) { // the first host selected is actually at [1], but this is ok for RR // interleaved iteration should always increment the host iterA := policy.Pick(nil) - if actual := iterA(); actual != &hosts[1] { - t.Errorf("Expected hosts[1] but was hosts[%s]", actual.HostId) + if actual := iterA(); actual.Info() != &hosts[1] { + t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostId) } iterB := policy.Pick(nil) - if actual := iterB(); actual != &hosts[0] { - t.Errorf("Expected hosts[0] but was hosts[%s]", actual.HostId) + if actual := iterB(); actual.Info() != &hosts[0] { + t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostId) } - if actual := iterB(); actual != &hosts[1] { - t.Errorf("Expected hosts[1] but was hosts[%s]", actual.HostId) + if actual := iterB(); actual.Info() != &hosts[1] { + t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostId) } - if actual := iterA(); actual != &hosts[0] { - t.Errorf("Expected hosts[0] but was hosts[%s]", actual.HostId) + if actual := iterA(); actual.Info() != &hosts[0] { + t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostId) } iterC := policy.Pick(nil) - if actual := iterC(); actual != &hosts[1] { - t.Errorf("Expected hosts[1] but was hosts[%s]", actual.HostId) + if actual := iterC(); actual.Info() != &hosts[1] { + t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostId) } - if actual := iterC(); actual != &hosts[0] { - t.Errorf("Expected hosts[0] but was hosts[%s]", actual.HostId) + if actual := iterC(); actual.Info() != &hosts[0] { + t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostId) } } @@ -70,13 +70,13 @@ func TestTokenAwareHostPolicy(t *testing.T) { // the token ring is not setup without the partitioner, but the fallback // should work - if actual := policy.Pick(nil)(); actual.Peer != "1" { - t.Errorf("Expected peer 1 but was %s", actual.Peer) + if actual := policy.Pick(nil)(); actual.Info().Peer != "1" { + t.Errorf("Expected peer 1 but was %s", actual.Info().Peer) } query.RoutingKey([]byte("30")) - if actual := policy.Pick(query)(); actual.Peer != "2" { - t.Errorf("Expected peer 2 but was %s", actual.Peer) + if actual := policy.Pick(query)(); actual.Info().Peer != "2" { + t.Errorf("Expected peer 2 but was %s", actual.Info().Peer) } policy.SetPartitioner("OrderedPartitioner") @@ -84,18 +84,18 @@ func TestTokenAwareHostPolicy(t *testing.T) { // now the token ring is configured query.RoutingKey([]byte("20")) iter = policy.Pick(query) - if actual := iter(); actual.Peer != "1" { - t.Errorf("Expected peer 1 but was %s", actual.Peer) + if actual := iter(); actual.Info().Peer != "1" { + t.Errorf("Expected peer 1 but was %s", actual.Info().Peer) } // rest are round robin - if actual := iter(); actual.Peer != "3" { - t.Errorf("Expected peer 3 but was %s", actual.Peer) + if actual := iter(); actual.Info().Peer != "3" { + t.Errorf("Expected peer 3 but was %s", actual.Info().Peer) } - if actual := iter(); actual.Peer != "0" { - t.Errorf("Expected peer 0 but was %s", actual.Peer) + if actual := iter(); actual.Info().Peer != "0" { + t.Errorf("Expected peer 0 but was %s", actual.Info().Peer) } - if actual := iter(); actual.Peer != "2" { - t.Errorf("Expected peer 2 but was %s", actual.Peer) + if actual := iter(); actual.Info().Peer != "2" { + t.Errorf("Expected peer 2 but was %s", actual.Info().Peer) } } diff --git a/session.go b/session.go index 371c98386..36ff5251a 100644 --- a/session.go +++ b/session.go @@ -235,7 +235,7 @@ func (s *Session) executeQuery(qry *Query) *Iter { qry.attempts = 0 qry.totalLatency = 0 for { - conn := s.pool.Pick(qry) + host, conn := s.pool.Pick(qry) //Assign the error unavailable to the iterator if conn == nil { @@ -254,9 +254,13 @@ func (s *Session) executeQuery(qry *Query) *Iter { //Exit for loop if the query was successful if iter.err == nil { + host.Mark(iter.err) break } + // Mark host as ok + host.Mark(nil) + if qry.rt == nil || !qry.rt.Attempt(qry) { break } @@ -323,7 +327,7 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) { ) // get the query info for the statement - conn := s.pool.Pick(nil) + host, conn := s.pool.Pick(nil) if conn == nil { // no connections inflight.err = ErrNoConnections @@ -336,9 +340,13 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) { if inflight.err != nil { // don't cache this error s.routingKeyInfoCache.Remove(stmt) + host.Mark(inflight.err) return nil, inflight.err } + // Mark host as OK + host.Mark(nil) + if len(info.Args) == 0 { // no arguments, no routing key, and no error return nil, nil @@ -418,7 +426,7 @@ func (s *Session) executeBatch(batch *Batch) (*Iter, error) { batch.attempts = 0 batch.totalLatency = 0 for { - conn := s.pool.Pick(nil) + host, conn := s.pool.Pick(nil) //Assign the error unavailable and break loop if conn == nil { @@ -431,9 +439,13 @@ func (s *Session) executeBatch(batch *Batch) (*Iter, error) { batch.attempts++ //Exit loop if operation executed correctly if err == nil { + host.Mark(err) return iter, err } + // Mark host as OK + host.Mark(nil) + if batch.rt == nil || !batch.rt.Attempt(batch) { break }