From 81e62c537364b2dc71167cc168c36b081efe5c39 Mon Sep 17 00:00:00 2001 From: Venkat Date: Tue, 24 Mar 2020 19:05:00 -0700 Subject: [PATCH] ringpop: update hashring immediately on ring change (#3130) --- common/membership/rpMonitor_test.go | 27 ++++++++ common/membership/rpServiceResolver.go | 90 ++++++++++++++++++-------- 2 files changed, 91 insertions(+), 26 deletions(-) diff --git a/common/membership/rpMonitor_test.go b/common/membership/rpMonitor_test.go index 3b015f3174a..31d75a5c3fb 100644 --- a/common/membership/rpMonitor_test.go +++ b/common/membership/rpMonitor_test.go @@ -84,3 +84,30 @@ func (s *RpoSuite) TestRingpopMonitor() { rpm.Stop() testService.Stop() } + +func (s *RpoSuite) TestCompareMembers() { + s.testCompareMembers([]string{}, []string{"a"}, true) + s.testCompareMembers([]string{}, []string{"a", "b"}, true) + s.testCompareMembers([]string{"a"}, []string{"a", "b"}, true) + s.testCompareMembers([]string{}, []string{"a"}, true) + s.testCompareMembers([]string{}, []string{"a", "b"}, true) + s.testCompareMembers([]string{}, []string{}, false) + s.testCompareMembers([]string{"a"}, []string{"a"}, false) + s.testCompareMembers([]string{"a", "b"}, []string{"a", "b"}, false) +} + +func (s *RpoSuite) testCompareMembers(curr []string, new []string, hasDiff bool) { + resolver := &ringpopServiceResolver{} + currMembers := make(map[string]struct{}, len(curr)) + for _, m := range curr { + currMembers[m] = struct{}{} + } + resolver.membersMap = currMembers + newMembers, changed := resolver.compareMembers(new) + s.Equal(hasDiff, changed) + s.Equal(len(new), len(newMembers)) + for _, m := range new { + _, ok := newMembers[m] + s.True(ok) + } +} diff --git a/common/membership/rpServiceResolver.go b/common/membership/rpServiceResolver.go index 3650e964123..c4923d10f67 100644 --- a/common/membership/rpServiceResolver.go +++ b/common/membership/rpServiceResolver.go @@ -53,9 +53,11 @@ type ringpopServiceResolver struct { shutdownWG sync.WaitGroup logger log.Logger - ringLock sync.RWMutex - ringLastRefreshTime time.Time - ring *hashring.HashRing + ringValue atomic.Value // this stores the current hashring + + refreshLock sync.Mutex + lastRefreshTime time.Time + membersMap map[string]struct{} // for de-duping change notifications listenerLock sync.RWMutex listeners map[string]chan<- *ChangedEvent @@ -69,18 +71,22 @@ func newRingpopServiceResolver( logger log.Logger, ) *ringpopServiceResolver { - return &ringpopServiceResolver{ + resolver := &ringpopServiceResolver{ status: common.DaemonStatusInitialized, service: service, rp: rp, refreshChan: make(chan struct{}), shutdownCh: make(chan struct{}), logger: logger.WithTags(tag.ComponentServiceResolver, tag.Service(service)), - - ring: hashring.New(farm.Fingerprint32, replicaPoints), - - listeners: make(map[string]chan<- *ChangedEvent), + membersMap: make(map[string]struct{}), + listeners: make(map[string]chan<- *ChangedEvent), } + resolver.ringValue.Store(newHashRing()) + return resolver +} + +func newHashRing() *hashring.HashRing { + return hashring.New(farm.Fingerprint32, replicaPoints) } // Start starts the oracle @@ -112,12 +118,10 @@ func (r *ringpopServiceResolver) Stop() { return } - r.ringLock.Lock() - defer r.ringLock.Unlock() r.listenerLock.Lock() defer r.listenerLock.Unlock() r.rp.RemoveListener(r) - r.ring = hashring.New(farm.Fingerprint32, replicaPoints) + r.ringValue.Store(newHashRing()) r.listeners = make(map[string]chan<- *ChangedEvent) close(r.shutdownCh) @@ -131,9 +135,7 @@ func (r *ringpopServiceResolver) Lookup( key string, ) (*HostInfo, error) { - r.ringLock.RLock() - defer r.ringLock.RUnlock() - addr, found := r.ring.Lookup(key) + addr, found := r.ring().Lookup(key) if !found { select { case r.refreshChan <- struct{}{}: @@ -174,12 +176,12 @@ func (r *ringpopServiceResolver) RemoveListener( } func (r *ringpopServiceResolver) MemberCount() int { - return r.ring.ServerCount() + return r.ring().ServerCount() } func (r *ringpopServiceResolver) Members() []*HostInfo { var servers []*HostInfo - for _, s := range r.ring.Servers() { + for _, s := range r.ring().Servers() { servers = append(servers, NewHostInfo(s, r.getLabelsMap())) } @@ -206,28 +208,42 @@ func (r *ringpopServiceResolver) HandleEvent( } func (r *ringpopServiceResolver) refresh() error { - r.ringLock.Lock() - defer r.ringLock.Unlock() + r.refreshLock.Lock() + defer r.refreshLock.Unlock() + return r.refreshNoLock() +} - if r.ringLastRefreshTime.After(time.Now().Add(-minRefreshInternal)) { +func (r *ringpopServiceResolver) refreshWithBackoff() error { + r.refreshLock.Lock() + defer r.refreshLock.Unlock() + if r.lastRefreshTime.After(time.Now().Add(-minRefreshInternal)) { // refresh too frequently return nil } + return r.refreshNoLock() +} - r.ring = hashring.New(farm.Fingerprint32, replicaPoints) - +func (r *ringpopServiceResolver) refreshNoLock() error { addrs, err := r.rp.GetReachableMembers(swim.MemberWithLabelAndValue(RoleKey, r.service)) if err != nil { return err } + newMembersMap, changed := r.compareMembers(addrs) + if !changed { + return nil + } + + ring := newHashRing() for _, addr := range addrs { host := NewHostInfo(addr, r.getLabelsMap()) - r.ring.AddMembers(host) + ring.AddMembers(host) } - r.ringLastRefreshTime = time.Now() - r.logger.Debug("Current reachable members", tag.Addresses(addrs)) + r.membersMap = newMembersMap + r.lastRefreshTime = time.Now() + r.ringValue.Store(ring) + r.logger.Info("Current reachable members", tag.Addresses(addrs)) return nil } @@ -271,19 +287,41 @@ func (r *ringpopServiceResolver) refreshRingWorker() { case <-r.shutdownCh: return case <-r.refreshChan: - if err := r.refresh(); err != nil { + if err := r.refreshWithBackoff(); err != nil { r.logger.Error("error periodically refreshing ring", tag.Error(err)) } case <-refreshTicker.C: - if err := r.refresh(); err != nil { + if err := r.refreshWithBackoff(); err != nil { r.logger.Error("error periodically refreshing ring", tag.Error(err)) } } } } +func (r *ringpopServiceResolver) ring() *hashring.HashRing { + return r.ringValue.Load().(*hashring.HashRing) +} + func (r *ringpopServiceResolver) getLabelsMap() map[string]string { labels := make(map[string]string) labels[RoleKey] = r.service return labels } + +func (r *ringpopServiceResolver) compareMembers(addrs []string) (map[string]struct{}, bool) { + changed := false + newMembersMap := make(map[string]struct{}, len(addrs)) + for _, addr := range addrs { + newMembersMap[addr] = struct{}{} + if _, ok := r.membersMap[addr]; !ok { + changed = true + } + } + for addr := range r.membersMap { + if _, ok := newMembersMap[addr]; !ok { + changed = true + break + } + } + return newMembersMap, changed +}