Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix DCAwareRoundRobinPolicy host distribution #986

Merged
merged 1 commit into from
Oct 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix DCAwareRoundRobinPolicy host distribution
DCAwareRoundRobinPolicy currently picks the first host in a map
of all local and remote hosts. This has a tendency to pick the same
host more frequently than others, resulting in an uneven distribution
of operations within the cluster. Modify DCAwareRoundRobinPolicy to
more closely match the RoundRobinHostPolicy by always incrementing the
hosts in a DC.
  • Loading branch information
Ben Krebsbach committed Oct 5, 2017
commit a0a30600c109c6cb8a36d91f32fbdffee4c55863
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,4 @@ Krishnanand Thommandra <devtkrishna@gmail.com>
Blake Atkinson <me@blakeatkinson.com>
Dharmendra Parsaila <d4dharmu@gmail.com>
Nayef Ghattas <nayef.ghattas@datadoghq.com>
Ben Krebsbach <ben.krebsbach@gmail.com>
68 changes: 30 additions & 38 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,43 +540,36 @@ func (host selectedHostPoolHost) Mark(err error) {
}

type dcAwareRR struct {
local string

local string
pos uint32
mu sync.RWMutex
localHosts map[string]*HostInfo
remoteHosts map[string]*HostInfo
localHosts cowHostList
remoteHosts cowHostList
}

// DCAwareRoundRobinPolicy is a host selection policies which will priorities and
// DCAwareRoundRobinPolicy is a host selection policies which will prioritize and
// return hosts which are in the local datacentre before returning hosts in all
// other datercentres
func DCAwareRoundRobinPolicy(localDC string) HostSelectionPolicy {
return &dcAwareRR{
local: localDC,
localHosts: make(map[string]*HostInfo),
remoteHosts: make(map[string]*HostInfo),
local: localDC,
}
}

func (d *dcAwareRR) AddHost(host *HostInfo) {
d.mu.Lock()

if host.DataCenter() == d.local {
d.localHosts[host.HostID()] = host
d.localHosts.add(host)
} else {
d.remoteHosts[host.HostID()] = host
d.remoteHosts.add(host)
}

d.mu.Unlock()
}

func (d *dcAwareRR) RemoveHost(host *HostInfo) {
d.mu.Lock()

delete(d.localHosts, host.HostID())
delete(d.remoteHosts, host.HostID())

d.mu.Unlock()
if host.DataCenter() == d.local {
d.localHosts.remove(host.ConnectAddress())
} else {
d.remoteHosts.remove(host.ConnectAddress())
}
}

func (d *dcAwareRR) HostUp(host *HostInfo) {
Expand All @@ -590,29 +583,28 @@ func (d *dcAwareRR) HostDown(host *HostInfo) {
func (d *dcAwareRR) SetPartitioner(p string) {}

func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost {
d.mu.RLock()

// TODO: this is O(len(hosts)) and requires calculating a full query plan for
// every query. On the other hand it is stupidly simply and provides random host
// order prefering local dcs over remote ones.
hosts := make([]*HostInfo, 0, len(d.localHosts)+len(d.remoteHosts))
for _, host := range d.localHosts {
hosts = append(hosts, host)
}
for _, host := range d.remoteHosts {
hosts = append(hosts, host)
}

d.mu.RUnlock()

var i int
return func() SelectedHost {
var hosts []*HostInfo
localHosts := d.localHosts.get()
remoteHosts := d.remoteHosts.get()
if len(localHosts) != 0 {
hosts = localHosts
} else {
hosts = remoteHosts
}
if len(hosts) == 0 {
return nil
}

host := hosts[0]
hosts = hosts[1:]

// always increment pos to evenly distribute traffic in case of
// failures
pos := atomic.AddUint32(&d.pos, 1) - 1
if i >= len(localHosts)+len(remoteHosts) {
return nil
}
host := hosts[(pos)%uint32(len(hosts))]
i++
return (*selectedHost)(host)
}
}
49 changes: 40 additions & 9 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,17 +304,48 @@ func TestExponentialBackoffPolicy(t *testing.T) {

func TestDCAwareRR(t *testing.T) {
p := DCAwareRoundRobinPolicy("local")
p.AddHost(&HostInfo{connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local"})
p.AddHost(&HostInfo{connectAddress: net.ParseIP("10.0.0.2"), dataCenter: "remote"})

iter := p.Pick(nil)
hosts := [...]*HostInfo{
{hostId: "0", connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local"},
{hostId: "1", connectAddress: net.ParseIP("10.0.0.2"), dataCenter: "local"},
{hostId: "2", connectAddress: net.ParseIP("10.0.0.3"), dataCenter: "remote"},
{hostId: "3", connectAddress: net.ParseIP("10.0.0.4"), dataCenter: "remote"},
}

h := iter()
if h.Info().DataCenter() != "local" {
t.Fatalf("expected to get local DC first, got %v", h.Info())
for _, host := range hosts {
p.AddHost(host)
}
h = iter()
if h.Info().DataCenter() != "remote" {
t.Fatalf("expected to get remote DC, got %v", h.Info())

// interleaved iteration should always increment the host
iterA := p.Pick(nil)
if actual := iterA(); actual.Info() != hosts[0] {
t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
}
iterB := p.Pick(nil)
if actual := iterB(); actual.Info() != hosts[1] {
t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
}
if actual := iterB(); actual.Info() != hosts[0] {
t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
}
if actual := iterA(); actual.Info() != hosts[1] {
t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
}
iterC := p.Pick(nil)
if actual := iterC(); actual.Info() != hosts[0] {
t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
}
p.RemoveHost(hosts[0])
if actual := iterC(); actual.Info() != hosts[1] {
t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
}
p.RemoveHost(hosts[1])
iterD := p.Pick(nil)
if actual := iterD(); actual.Info() != hosts[2] {
t.Errorf("Expected hosts[2] but was hosts[%s]", actual.Info().HostID())
}
if actual := iterD(); actual.Info() != hosts[3] {
t.Errorf("Expected hosts[3] but was hosts[%s]", actual.Info().HostID())
}

}