Skip to content

Commit

Permalink
ring: addHostIfMissing returns existing host
Browse files Browse the repository at this point in the history
If the ring already has the host that is being added, return the
existing one and add a method to update the host from another host.
  • Loading branch information
Zariel committed Dec 25, 2015
1 parent b13e734 commit 7c61628
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 14 deletions.
14 changes: 9 additions & 5 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ func (s *Session) handleNodeEvent(frames []frame) {
case *topologyChangeEventFrame:
event, ok := events[f.host.String()]
if !ok {
event = &nodeEvent{change: f.change, host: f.host}
event = &nodeEvent{change: f.change, host: f.host, port: f.port}
events[f.host.String()] = event
}
event.change = f.change

case *statusChangeEventFrame:
event, ok := events[f.host.String()]
if !ok {
event = &nodeEvent{change: f.change, host: f.host}
event = &nodeEvent{change: f.change, host: f.host, port: f.port}
events[f.host.String()] = event
}
event.change = f.change
Expand Down Expand Up @@ -169,19 +169,23 @@ func (s *Session) handleNewNode(host net.IP, port int) {
}

// should this handle token moving?
if !s.ring.addHostIfMissing(hostInfo) {
s.handleNodeUp(host, port)
return
if existing, ok := s.ring.addHostIfMissing(hostInfo); !ok {
log.Printf("already have host=%v existing=%v, updating\n", hostInfo, existing)
existing.update(hostInfo)
hostInfo = existing
}

s.pool.addHost(hostInfo)
s.hostSource.refreshRing()
}

func (s *Session) handleRemovedNode(ip net.IP, port int) {
// we remove all nodes but only add ones which pass the filter
addr := ip.String()
s.pool.removeHost(addr)
s.ring.removeHost(addr)

s.hostSource.refreshRing()
}

func (s *Session) handleNodeUp(ip net.IP, port int) {
Expand Down
24 changes: 18 additions & 6 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@ func (h *HostInfo) setTokens(tokens []string) *HostInfo {
return h
}

func (h *HostInfo) update(from *HostInfo) {
h.mu.Lock()
defer h.mu.Unlock()

h.tokens = from.tokens
h.version = from.version
h.hostId = from.hostId
h.dataCenter = from.dataCenter
}

func (h *HostInfo) IsUp() bool {
return h.State() == NodeUp
}
Expand Down Expand Up @@ -284,7 +294,6 @@ func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err e
}

func (r *ringDescriber) matchFilter(host *HostInfo) bool {

if r.dcFilter != "" && r.dcFilter != host.DataCenter() {
return false
}
Expand All @@ -296,24 +305,27 @@ func (r *ringDescriber) matchFilter(host *HostInfo) bool {
return true
}

func (r *ringDescriber) refreshRing() {
func (r *ringDescriber) refreshRing() error {
// if we have 0 hosts this will return the previous list of hosts to
// attempt to reconnect to the cluster otherwise we would never find
// downed hosts again, could possibly have an optimisation to only
// try to add new hosts if GetHosts didnt error and the hosts didnt change.
hosts, partitioner, err := r.GetHosts()
if err != nil {
log.Println("RingDescriber: unable to get ring topology:", err)
return
return err
}

// TODO: move this to session
// TODO: handle removing hosts here
for _, h := range hosts {
if r.session.ring.addHostIfMissing(h) {
log.Println(h)
if host, ok := r.session.ring.addHostIfMissing(h); !ok {
r.session.pool.addHost(h)
// TODO: trigger OnUp/OnAdd
} else {
host.update(h)
}
}

r.session.pool.SetPartitioner(partitioner)
return nil
}
7 changes: 4 additions & 3 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,20 @@ func (r *ring) addHost(host *HostInfo) bool {
return ok
}

func (r *ring) addHostIfMissing(host *HostInfo) bool {
func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) {
r.mu.Lock()
if r.hosts == nil {
r.hosts = make(map[string]*HostInfo)
}

addr := host.Peer()
_, ok := r.hosts[addr]
existing, ok := r.hosts[addr]
if !ok {
r.hosts[addr] = host
existing = host
}
r.mu.Unlock()
return ok
return existing, ok
}

func (r *ring) removeHost(addr string) bool {
Expand Down

0 comments on commit 7c61628

Please sign in to comment.