Skip to content

Commit

Permalink
fix DCAwareRoundRobinPolicy host distribution (apache#986)
Browse files Browse the repository at this point in the history
DCAwareRoundRobinPolicy currently picks the first host in a map
of all local and remote hosts. This has a tendency to pick the same
host more frequently than others, resulting in an uneven distribution
of operations within the cluster. Modify DCAwareRoundRobinPolicy to
more closely match the RoundRobinHostPolicy by always incrementing the
hosts in a DC.
  • Loading branch information
bkrebsbach authored and Zariel committed Oct 5, 2017
1 parent ce5020a commit 808320e
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 47 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,4 @@ Krishnanand Thommandra <devtkrishna@gmail.com>
Blake Atkinson <me@blakeatkinson.com>
Dharmendra Parsaila <d4dharmu@gmail.com>
Nayef Ghattas <nayef.ghattas@datadoghq.com>
Ben Krebsbach <ben.krebsbach@gmail.com>
68 changes: 30 additions & 38 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,43 +540,36 @@ func (host selectedHostPoolHost) Mark(err error) {
}

type dcAwareRR struct {
local string

local string
pos uint32
mu sync.RWMutex
localHosts map[string]*HostInfo
remoteHosts map[string]*HostInfo
localHosts cowHostList
remoteHosts cowHostList
}

// DCAwareRoundRobinPolicy is a host selection policies which will priorities and
// DCAwareRoundRobinPolicy is a host selection policies which will prioritize and
// return hosts which are in the local datacentre before returning hosts in all
// other datercentres
func DCAwareRoundRobinPolicy(localDC string) HostSelectionPolicy {
return &dcAwareRR{
local: localDC,
localHosts: make(map[string]*HostInfo),
remoteHosts: make(map[string]*HostInfo),
local: localDC,
}
}

func (d *dcAwareRR) AddHost(host *HostInfo) {
d.mu.Lock()

if host.DataCenter() == d.local {
d.localHosts[host.HostID()] = host
d.localHosts.add(host)
} else {
d.remoteHosts[host.HostID()] = host
d.remoteHosts.add(host)
}

d.mu.Unlock()
}

func (d *dcAwareRR) RemoveHost(host *HostInfo) {
d.mu.Lock()

delete(d.localHosts, host.HostID())
delete(d.remoteHosts, host.HostID())

d.mu.Unlock()
if host.DataCenter() == d.local {
d.localHosts.remove(host.ConnectAddress())
} else {
d.remoteHosts.remove(host.ConnectAddress())
}
}

func (d *dcAwareRR) HostUp(host *HostInfo) {
Expand All @@ -590,29 +583,28 @@ func (d *dcAwareRR) HostDown(host *HostInfo) {
func (d *dcAwareRR) SetPartitioner(p string) {}

func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost {
d.mu.RLock()

// TODO: this is O(len(hosts)) and requires calculating a full query plan for
// every query. On the other hand it is stupidly simply and provides random host
// order prefering local dcs over remote ones.
hosts := make([]*HostInfo, 0, len(d.localHosts)+len(d.remoteHosts))
for _, host := range d.localHosts {
hosts = append(hosts, host)
}
for _, host := range d.remoteHosts {
hosts = append(hosts, host)
}

d.mu.RUnlock()

var i int
return func() SelectedHost {
var hosts []*HostInfo
localHosts := d.localHosts.get()
remoteHosts := d.remoteHosts.get()
if len(localHosts) != 0 {
hosts = localHosts
} else {
hosts = remoteHosts
}
if len(hosts) == 0 {
return nil
}

host := hosts[0]
hosts = hosts[1:]

// always increment pos to evenly distribute traffic in case of
// failures
pos := atomic.AddUint32(&d.pos, 1) - 1
if i >= len(localHosts)+len(remoteHosts) {
return nil
}
host := hosts[(pos)%uint32(len(hosts))]
i++
return (*selectedHost)(host)
}
}
49 changes: 40 additions & 9 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,17 +304,48 @@ func TestExponentialBackoffPolicy(t *testing.T) {

func TestDCAwareRR(t *testing.T) {
p := DCAwareRoundRobinPolicy("local")
p.AddHost(&HostInfo{connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local"})
p.AddHost(&HostInfo{connectAddress: net.ParseIP("10.0.0.2"), dataCenter: "remote"})

iter := p.Pick(nil)
hosts := [...]*HostInfo{
{hostId: "0", connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local"},
{hostId: "1", connectAddress: net.ParseIP("10.0.0.2"), dataCenter: "local"},
{hostId: "2", connectAddress: net.ParseIP("10.0.0.3"), dataCenter: "remote"},
{hostId: "3", connectAddress: net.ParseIP("10.0.0.4"), dataCenter: "remote"},
}

h := iter()
if h.Info().DataCenter() != "local" {
t.Fatalf("expected to get local DC first, got %v", h.Info())
for _, host := range hosts {
p.AddHost(host)
}
h = iter()
if h.Info().DataCenter() != "remote" {
t.Fatalf("expected to get remote DC, got %v", h.Info())

// interleaved iteration should always increment the host
iterA := p.Pick(nil)
if actual := iterA(); actual.Info() != hosts[0] {
t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
}
iterB := p.Pick(nil)
if actual := iterB(); actual.Info() != hosts[1] {
t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
}
if actual := iterB(); actual.Info() != hosts[0] {
t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
}
if actual := iterA(); actual.Info() != hosts[1] {
t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
}
iterC := p.Pick(nil)
if actual := iterC(); actual.Info() != hosts[0] {
t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
}
p.RemoveHost(hosts[0])
if actual := iterC(); actual.Info() != hosts[1] {
t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
}
p.RemoveHost(hosts[1])
iterD := p.Pick(nil)
if actual := iterD(); actual.Info() != hosts[2] {
t.Errorf("Expected hosts[2] but was hosts[%s]", actual.Info().HostID())
}
if actual := iterD(); actual.Info() != hosts[3] {
t.Errorf("Expected hosts[3] but was hosts[%s]", actual.Info().HostID())
}

}

0 comments on commit 808320e

Please sign in to comment.