Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[membership] Replace Ringop with PeerProvider interface #4653

Merged
Prev Previous commit
Next Next commit
refreshLocked
  • Loading branch information
mantas-sidlauskas committed Nov 23, 2021
commit cce32dc1f42b28dbd505d1474c3e97735514f61a
26 changes: 11 additions & 15 deletions common/membership/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (r *ring) Start() {
}
r.peerProvider.Subscribe(r.service, r.refreshChan)

if err := r.refresh(); err != nil {
if err := r.refreshLocked(); err != nil {
r.logger.Fatal("failed to start service resolver", tag.Error(err))
}

Expand Down Expand Up @@ -206,21 +206,9 @@ func (r *ring) Members() []*HostInfo {
return servers
}

func (r *ring) refresh() error {
func (r *ring) refreshLocked() error {
r.refreshLock.Lock()
defer r.refreshLock.Unlock()
return r.refreshNoLock()
}

func (r *ring) refreshWithBackoff() error {
if r.lastRefreshTime.After(time.Now().Add(-minRefreshInternal)) {
// refresh too frequently
return nil
}
return r.refresh()
}

func (r *ring) refreshNoLock() error {
addrs, err := r.peerProvider.GetMembers(r.service)

if err != nil {
Expand Down Expand Up @@ -248,6 +236,14 @@ func (r *ring) refreshNoLock() error {
return nil
}

func (r *ring) refreshWithBackoff() error {
if r.lastRefreshTime.After(time.Now().Add(-minRefreshInternal)) {
// refreshLocked too frequently
return nil
}
return r.refreshLocked()
}

func (r *ring) refreshRingWorker() {
defer r.shutdownWG.Done()

Expand All @@ -261,7 +257,7 @@ func (r *ring) refreshRingWorker() {
if err := r.refreshWithBackoff(); err != nil {
r.logger.Error("refreshing ring", tag.Error(err))
}
case <-refreshTicker.C: // periodically refresh membership
case <-refreshTicker.C: // periodically refreshLocked membership
if err := r.refreshWithBackoff(); err != nil {
r.logger.Error("periodically refreshing ring", tag.Error(err))
}
Expand Down
6 changes: 3 additions & 3 deletions common/membership/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ func TestRefreshUpdatesRingOnlyWhenRingHasChanged(t *testing.T) {
hr := newHashring("test-service", pp, log.NewNoop())
hr.Start()

hr.refresh()
hr.refreshLocked()
updatedAt := hr.lastRefreshTime
hr.refresh()
hr.refreshLocked()
assert.Equal(t, updatedAt, hr.lastRefreshTime)

}
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestErrorIsPropagatedWhenProviderFails(t *testing.T) {
pp.EXPECT().GetMembers(gomock.Any()).Return([]string{}, errors.New("error"))

hr := newHashring("test-service", pp, log.NewNoop())
assert.Error(t, hr.refresh())
assert.Error(t, hr.refreshLocked())
}

func TestStopWillStopProvider(t *testing.T) {
Expand Down