Skip to content

Commit

Permalink
Add LOAD_BALANCING_POLICY_SLOW_AVOIDANCE funtionality
Browse files Browse the repository at this point in the history
The java driver has the feature to automatically avoid slow replicas
by doing simple heuristics. This is one of the key feature to have a
stable latency.

This commit adds additional field in tokenAwareHostPolicy to control
if the feature is enabled and what is the maximum in flight threshold.

If feature is enabled driver sorts the replicas to first try those
with less than specified maximum in flight connections.

Fixes: apache#154
  • Loading branch information
sylwiaszunejko committed Apr 29, 2024
1 parent 3c32c6c commit 26c61ef
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 0 deletions.
20 changes: 20 additions & 0 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,17 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
}
}

func (p *policyConnPool) InFlight() int {
p.mu.RLock()
count := 0
for _, pool := range p.hostConnPools {
count += pool.InFlight()
}
p.mu.RUnlock()

return count
}

func (p *policyConnPool) Size() int {
p.mu.RLock()
count := 0
Expand Down Expand Up @@ -348,6 +359,15 @@ func (pool *hostConnPool) Size() int {
return size
}

// Size returns the number of connections currently active in the pool
func (pool *hostConnPool) InFlight() int {
pool.mu.RLock()
defer pool.mu.RUnlock()

size := pool.connPicker.InFlight()
return size
}

// Close the connection pool
func (pool *hostConnPool) Close() {
pool.mu.Lock()
Expand Down
10 changes: 10 additions & 0 deletions connpicker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type ConnPicker interface {
Pick(token, string, string) *Conn
Put(*Conn)
Remove(conn *Conn)
InFlight() int
Size() (int, int)
Close()

Expand Down Expand Up @@ -60,6 +61,11 @@ func (p *defaultConnPicker) Close() {
}
}

func (p *defaultConnPicker) InFlight() int {
size := len(p.conns)
return size
}

func (p *defaultConnPicker) Size() (int, int) {
size := len(p.conns)
return size, p.size - size
Expand Down Expand Up @@ -114,6 +120,10 @@ func (nopConnPicker) Put(*Conn) {
func (nopConnPicker) Remove(conn *Conn) {
}

func (nopConnPicker) InFlight() int {
return 0
}

func (nopConnPicker) Size() (int, int) {
// Return 1 to make hostConnPool to try to establish a connection.
// When first connection is established hostConnPool replaces nopConnPicker
Expand Down
5 changes: 5 additions & 0 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,11 @@ func (h *HostInfo) IsUp() bool {
return h != nil && h.State() == NodeUp
}

func (h *HostInfo) IsBusy(s *Session) bool {
pool, ok := s.pool.getPool(h)
return ok && h != nil && pool.InFlight() >= MAX_IN_FLIGHT_THRESHOLD
}

func (h *HostInfo) HostnameAndPort() string {
h.mu.Lock()
defer h.mu.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions internal/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,7 @@ func (s *IDGenerator) Clear(stream int) (inuse bool) {
func (s *IDGenerator) Available() int {
return s.NumStreams - int(atomic.LoadInt32(&s.inuseStreams)) - 1
}

func (s *IDGenerator) InUse() int {
return int(atomic.LoadInt32(&s.inuseStreams))
}
30 changes: 30 additions & 0 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,17 @@ func ShuffleReplicas() func(*tokenAwareHostPolicy) {
}
}

// AvoidSlowReplicas enabled avoiding slow replicas
//
// TokenAwareHostPolicy normally does not check how busy replica is, with avoidSlowReplicas enabled it avoids replicas
// if they have equal or more than MAX_IN_FLIGHT_THRESHOLD requests in flight
func AvoidSlowReplicas(max_in_flight_threshold int) func(policy *tokenAwareHostPolicy) {
return func(t *tokenAwareHostPolicy) {
t.avoidSlowReplicas = true
MAX_IN_FLIGHT_THRESHOLD = max_in_flight_threshold
}
}

// NonLocalReplicasFallback enables fallback to replicas that are not considered local.
//
// TokenAwareHostPolicy used with DCAwareHostPolicy fallback first selects replicas by partition key in local DC, then
Expand Down Expand Up @@ -424,6 +435,8 @@ type clusterMeta struct {
tokenRing *tokenRing
}

var MAX_IN_FLIGHT_THRESHOLD int = 10

type tokenAwareHostPolicy struct {
fallback HostSelectionPolicy
getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error)
Expand All @@ -443,6 +456,8 @@ type tokenAwareHostPolicy struct {

// Experimental, this interface and use may change
tablets cowTabletList

avoidSlowReplicas bool
}

func (t *tokenAwareHostPolicy) Init(s *Session) {
Expand Down Expand Up @@ -687,6 +702,21 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
}
}

if s := qry.GetSession(); s != nil && t.avoidSlowReplicas {
healthyReplicas := make([]*HostInfo, 0, len(replicas))
unhealthyReplicas := make([]*HostInfo, 0, len(replicas))

for _, h := range replicas {
if h.IsBusy(s) {
unhealthyReplicas = append(unhealthyReplicas, h)
} else {
healthyReplicas = append(healthyReplicas, h)
}
}

replicas = append(healthyReplicas, unhealthyReplicas...)
}

var (
fallbackIter NextHost
i, j, k int
Expand Down
10 changes: 10 additions & 0 deletions scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,16 @@ func (p *scyllaConnPicker) Remove(conn *Conn) {
}
}

func (p *scyllaConnPicker) InFlight() int {
result := 0
for _, conn := range p.conns {
if conn != nil {
result = result + (conn.streams.InUse())
}
}
return result
}

func (p *scyllaConnPicker) Size() (int, int) {
return p.nrConns, p.nrShards - p.nrConns
}
Expand Down

0 comments on commit 26c61ef

Please sign in to comment.