diff --git a/connectionpool.go b/connectionpool.go index 4e61f3062..c34448326 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -211,6 +211,17 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) { } } +func (p *policyConnPool) InFlight() int { + p.mu.RLock() + count := 0 + for _, pool := range p.hostConnPools { + count += pool.InFlight() + } + p.mu.RUnlock() + + return count +} + func (p *policyConnPool) Size() int { p.mu.RLock() count := 0 @@ -348,6 +359,15 @@ func (pool *hostConnPool) Size() int { return size } +// Size returns the number of connections currently active in the pool +func (pool *hostConnPool) InFlight() int { + pool.mu.RLock() + defer pool.mu.RUnlock() + + size := pool.connPicker.InFlight() + return size +} + // Close the connection pool func (pool *hostConnPool) Close() { pool.mu.Lock() diff --git a/connpicker.go b/connpicker.go index af43d35c0..c6c65f7d3 100644 --- a/connpicker.go +++ b/connpicker.go @@ -10,6 +10,7 @@ type ConnPicker interface { Pick(token, string, string) *Conn Put(*Conn) Remove(conn *Conn) + InFlight() int Size() (int, int) Close() @@ -60,6 +61,11 @@ func (p *defaultConnPicker) Close() { } } +func (p *defaultConnPicker) InFlight() int { + size := len(p.conns) + return size +} + func (p *defaultConnPicker) Size() (int, int) { size := len(p.conns) return size, p.size - size @@ -114,6 +120,10 @@ func (nopConnPicker) Put(*Conn) { func (nopConnPicker) Remove(conn *Conn) { } +func (nopConnPicker) InFlight() int { + return 0 +} + func (nopConnPicker) Size() (int, int) { // Return 1 to make hostConnPool to try to establish a connection. // When first connection is established hostConnPool replaces nopConnPicker diff --git a/host_source.go b/host_source.go index 31132e38f..8dcf371ae 100644 --- a/host_source.go +++ b/host_source.go @@ -409,6 +409,11 @@ func (h *HostInfo) IsUp() bool { return h != nil && h.State() == NodeUp } +func (h *HostInfo) IsBusy(s *Session) bool { + pool, ok := s.pool.getPool(h) + return ok && h != nil && pool.InFlight() >= MAX_IN_FLIGHT_THRESHOLD +} + func (h *HostInfo) HostnameAndPort() string { h.mu.Lock() defer h.mu.Unlock() diff --git a/internal/streams/streams.go b/internal/streams/streams.go index 05bcd7d6a..7e502c2cc 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -145,3 +145,7 @@ func (s *IDGenerator) Clear(stream int) (inuse bool) { func (s *IDGenerator) Available() int { return s.NumStreams - int(atomic.LoadInt32(&s.inuseStreams)) - 1 } + +func (s *IDGenerator) InUse() int { + return int(atomic.LoadInt32(&s.inuseStreams)) +} diff --git a/policies.go b/policies.go index 70ea00164..98867d2fa 100644 --- a/policies.go +++ b/policies.go @@ -391,6 +391,17 @@ func ShuffleReplicas() func(*tokenAwareHostPolicy) { } } +// AvoidSlowReplicas enabled avoiding slow replicas +// +// TokenAwareHostPolicy normally does not check how busy replica is, with avoidSlowReplicas enabled it avoids replicas +// if they have equal or more than MAX_IN_FLIGHT_THRESHOLD requests in flight +func AvoidSlowReplicas(max_in_flight_threshold int) func(policy *tokenAwareHostPolicy) { + return func(t *tokenAwareHostPolicy) { + t.avoidSlowReplicas = true + MAX_IN_FLIGHT_THRESHOLD = max_in_flight_threshold + } +} + // NonLocalReplicasFallback enables fallback to replicas that are not considered local. // // TokenAwareHostPolicy used with DCAwareHostPolicy fallback first selects replicas by partition key in local DC, then @@ -424,6 +435,8 @@ type clusterMeta struct { tokenRing *tokenRing } +var MAX_IN_FLIGHT_THRESHOLD int = 10 + type tokenAwareHostPolicy struct { fallback HostSelectionPolicy getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error) @@ -443,6 +456,8 @@ type tokenAwareHostPolicy struct { // Experimental, this interface and use may change tablets cowTabletList + + avoidSlowReplicas bool } func (t *tokenAwareHostPolicy) Init(s *Session) { @@ -687,6 +702,21 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { } } + if s := qry.GetSession(); s != nil && t.avoidSlowReplicas { + healthyReplicas := make([]*HostInfo, 0, len(replicas)) + unhealthyReplicas := make([]*HostInfo, 0, len(replicas)) + + for _, h := range replicas { + if h.IsBusy(s) { + unhealthyReplicas = append(unhealthyReplicas, h) + } else { + healthyReplicas = append(healthyReplicas, h) + } + } + + replicas = append(healthyReplicas, unhealthyReplicas...) + } + var ( fallbackIter NextHost i, j, k int diff --git a/scylla.go b/scylla.go index 7dece242a..49d14d55a 100644 --- a/scylla.go +++ b/scylla.go @@ -544,6 +544,16 @@ func (p *scyllaConnPicker) Remove(conn *Conn) { } } +func (p *scyllaConnPicker) InFlight() int { + result := 0 + for _, conn := range p.conns { + if conn != nil { + result = result + (conn.streams.InUse()) + } + } + return result +} + func (p *scyllaConnPicker) Size() (int, int) { return p.nrConns, p.nrShards - p.nrConns }