Skip to content

Commit

Permalink
policies: add simple DCAwareRoundRobin policy (apache#942)
Browse files Browse the repository at this point in the history
Add a simple host policy to select local dc hosts before remote hosts.
  • Loading branch information
Zariel authored Jul 23, 2017
1 parent 88748c7 commit 2f9cd61
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 0 deletions.
78 changes: 78 additions & 0 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,3 +535,81 @@ func (host selectedHostPoolHost) Mark(err error) {

host.hostR.Mark(err)
}

type dcAwareRR struct {
local string

mu sync.RWMutex
localHosts map[string]*HostInfo
remoteHosts map[string]*HostInfo
}

// DCAwareRoundRobbinPolicy is a host selection policies which will priorities and
// return hosts which are in the local datacentre before returning hosts in all
// other datercentres
func DCAwareRoundRobbinPolicy(localDC string) HostSelectionPolicy {
return &dcAwareRR{
local: localDC,
localHosts: make(map[string]*HostInfo),
remoteHosts: make(map[string]*HostInfo),
}
}

func (d *dcAwareRR) AddHost(host *HostInfo) {
d.mu.Lock()

if host.DataCenter() == d.local {
d.localHosts[host.HostID()] = host
} else {
d.remoteHosts[host.HostID()] = host
}

d.mu.Unlock()
}

func (d *dcAwareRR) RemoveHost(host *HostInfo) {
d.mu.Lock()

delete(d.localHosts, host.HostID())
delete(d.remoteHosts, host.HostID())

d.mu.Unlock()
}

func (d *dcAwareRR) HostUp(host *HostInfo) {
d.AddHost(host)
}

func (d *dcAwareRR) HostDown(host *HostInfo) {
d.RemoveHost(host)
}

func (d *dcAwareRR) SetPartitioner(p string) {}

func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost {
d.mu.RLock()

// TODO: this is O(len(hosts)) and requires calculating a full query plan for
// every query. On the other hand it is stupidly simply and provides random host
// order prefering local dcs over remote ones.
hosts := make([]*HostInfo, 0, len(d.localHosts)+len(d.remoteHosts))
for _, host := range d.localHosts {
hosts = append(hosts, host)
}
for _, host := range d.remoteHosts {
hosts = append(hosts, host)
}

d.mu.RUnlock()

return func() SelectedHost {
if len(hosts) == 0 {
return nil
}

host := hosts[0]
hosts = hosts[1:]

return (*selectedHost)(host)
}
}
17 changes: 17 additions & 0 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,20 @@ func TestExponentialBackoffPolicy(t *testing.T) {
}
}
}

func TestDCAwareRR(t *testing.T) {
p := DCAwareRoundRobbinPolicy("local")
p.AddHost(&HostInfo{connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local"})
p.AddHost(&HostInfo{connectAddress: net.ParseIP("10.0.0.2"), dataCenter: "remote"})

iter := p.Pick(nil)

h := iter()
if h.Info().DataCenter() != "local" {
t.Fatalf("expected to get local DC first, got %v", h.Info())
}
h = iter()
if h.Info().DataCenter() != "remote" {
t.Fatalf("expected to get remote DC, got %v", h.Info())
}
}

0 comments on commit 2f9cd61

Please sign in to comment.