From f9409d385fbaa24c04d2c634aafe974f4f13a042 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 4 Aug 2022 12:11:45 -0700 Subject: [PATCH] ringhash: handle config updates properly (#5557) --- xds/internal/balancer/ringhash/ring.go | 34 +++++++------- xds/internal/balancer/ringhash/ring_test.go | 6 +-- xds/internal/balancer/ringhash/ringhash.go | 44 ++++++++++--------- .../balancer/ringhash/ringhash_test.go | 42 ++++++++++++++++-- 4 files changed, 81 insertions(+), 45 deletions(-) diff --git a/xds/internal/balancer/ringhash/ring.go b/xds/internal/balancer/ringhash/ring.go index 5e8d881006e6..71d31eaeb8b0 100644 --- a/xds/internal/balancer/ringhash/ring.go +++ b/xds/internal/balancer/ringhash/ring.go @@ -19,7 +19,6 @@ package ringhash import ( - "fmt" "math" "sort" "strconv" @@ -64,12 +63,12 @@ type ringEntry struct { // // To pick from a ring, a binary search will be done for the given target hash, // and first item with hash >= given hash will be returned. -func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*ring, error) { +// +// Must be called with a non-empty subConns map. +func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) *ring { // https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114 - normalizedWeights, minWeight, err := normalizeWeights(subConns) - if err != nil { - return nil, err - } + normalizedWeights, minWeight := normalizeWeights(subConns) + // Normalized weights for {3,3,4} is {0.3,0.3,0.4}. // Scale up the size of the ring such that the least-weighted host gets a @@ -106,30 +105,29 @@ func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*r for i, ii := range items { ii.idx = i } - return &ring{items: items}, nil + return &ring{items: items} } // normalizeWeights divides all the weights by the sum, so that the total weight // is 1. -func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64, error) { - keys := subConns.Keys() - if len(keys) == 0 { - return nil, 0, fmt.Errorf("number of subconns is 0") - } +// +// Must be called with a non-empty subConns map. +func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64) { var weightSum uint32 + keys := subConns.Keys() for _, a := range keys { weightSum += getWeightAttribute(a) } - if weightSum == 0 { - return nil, 0, fmt.Errorf("total weight of all subconns is 0") - } - weightSumF := float64(weightSum) ret := make([]subConnWithWeight, 0, len(keys)) min := float64(1.0) for _, a := range keys { v, _ := subConns.Get(a) scInfo := v.(*subConn) - nw := float64(getWeightAttribute(a)) / weightSumF + // getWeightAttribute() returns 1 if the weight attribute is not found + // on the address. And since this function is guaranteed to be called + // with a non-empty subConns map, weightSum is guaranteed to be + // non-zero. So, we need not worry about divide a by zero error here. + nw := float64(getWeightAttribute(a)) / float64(weightSum) ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw}) if nw < min { min = nw @@ -142,7 +140,7 @@ func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float // where an address is added and then removed, the RPCs will still pick the // same old SubConn. sort.Slice(ret, func(i, j int) bool { return ret[i].sc.addr < ret[j].sc.addr }) - return ret, min, nil + return ret, min } // pick does a binary search. It returns the item with smallest index i that diff --git a/xds/internal/balancer/ringhash/ring_test.go b/xds/internal/balancer/ringhash/ring_test.go index 20184ab8d20e..b1d987609903 100644 --- a/xds/internal/balancer/ringhash/ring_test.go +++ b/xds/internal/balancer/ringhash/ring_test.go @@ -52,7 +52,7 @@ func (s) TestRingNew(t *testing.T) { for _, min := range []uint64{3, 4, 6, 8} { for _, max := range []uint64{20, 8} { t.Run(fmt.Sprintf("size-min-%v-max-%v", min, max), func(t *testing.T) { - r, _ := newRing(testSubConnMap, min, max) + r := newRing(testSubConnMap, min, max) totalCount := len(r.items) if totalCount < int(min) || totalCount > int(max) { t.Fatalf("unexpect size %v, want min %v, max %v", totalCount, min, max) @@ -82,7 +82,7 @@ func equalApproximately(x, y float64) bool { } func (s) TestRingPick(t *testing.T) { - r, _ := newRing(testSubConnMap, 10, 20) + r := newRing(testSubConnMap, 10, 20) for _, h := range []uint64{xxhash.Sum64String("1"), xxhash.Sum64String("2"), xxhash.Sum64String("3"), xxhash.Sum64String("4")} { t.Run(fmt.Sprintf("picking-hash-%v", h), func(t *testing.T) { e := r.pick(h) @@ -100,7 +100,7 @@ func (s) TestRingPick(t *testing.T) { } func (s) TestRingNext(t *testing.T) { - r, _ := newRing(testSubConnMap, 10, 20) + r := newRing(testSubConnMap, 10, 20) for _, e := range r.items { ne := r.next(e) diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index 3e06fc4eb6eb..e2ad49fca4ab 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -259,29 +259,22 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) error { b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig)) - if b.config == nil { - newConfig, ok := s.BalancerConfig.(*LBConfig) - if !ok { - return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) - } - b.config = newConfig + newConfig, ok := s.BalancerConfig.(*LBConfig) + if !ok { + return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) } - // Successful resolution; clear resolver error and ensure we return nil. - b.resolverErr = nil - if b.updateAddresses(s.ResolverState.Addresses) { - // If addresses were updated, no matter whether it resulted in SubConn - // creation/deletion, or just weight update, we will need to regenerate - // the ring. - var err error - b.ring, err = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize) - if err != nil { - b.ResolverError(fmt.Errorf("ringhash failed to make a new ring: %v", err)) - return balancer.ErrBadResolverState - } - b.regeneratePicker() - b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) + // If addresses were updated, whether it resulted in SubConn + // creation/deletion, or just weight update, we need to regenerate the ring + // and send a new picker. + regenerateRing := b.updateAddresses(s.ResolverState.Addresses) + + // If the ring configuration has changed, we need to regenerate the ring and + // send a new picker. + if b.config == nil || b.config.MinRingSize != newConfig.MinRingSize || b.config.MaxRingSize != newConfig.MaxRingSize { + regenerateRing = true } + b.config = newConfig // If resolver state contains no addresses, return an error so ClientConn // will trigger re-resolve. Also records this as an resolver error, so when @@ -291,6 +284,17 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err b.ResolverError(errors.New("produced zero addresses")) return balancer.ErrBadResolverState } + + if regenerateRing { + // Ring creation is guaranteed to not fail because we call newRing() + // with a non-empty subConns map. + b.ring = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize) + b.regeneratePicker() + b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) + } + + // Successful resolution; clear resolver error and return nil. + b.resolverErr = nil return nil } diff --git a/xds/internal/balancer/ringhash/ringhash_test.go b/xds/internal/balancer/ringhash/ringhash_test.go index 02302321ce5e..e5b10556e982 100644 --- a/xds/internal/balancer/ringhash/ringhash_test.go +++ b/xds/internal/balancer/ringhash/ringhash_test.go @@ -108,6 +108,40 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } +// TestUpdateClientConnState_NewRingSize tests the scenario where the ringhash +// LB policy receives new configuration which specifies new values for the ring +// min and max sizes. The test verifies that a new ring is created and a new +// picker is sent to the ClientConn. +func (s) TestUpdateClientConnState_NewRingSize(t *testing.T) { + origMinRingSize, origMaxRingSize := 1, 10 // Configured from `testConfig` in `setupTest` + newMinRingSize, newMaxRingSize := 20, 100 + + addrs := []resolver.Address{{Addr: testBackendAddrStrs[0]}} + cc, b, p1 := setupTest(t, addrs) + ring1 := p1.(*picker).ring + if ringSize := len(ring1.items); ringSize < origMinRingSize || ringSize > origMaxRingSize { + t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, origMinRingSize, origMaxRingSize) + } + + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: addrs}, + BalancerConfig: &LBConfig{MinRingSize: uint64(newMinRingSize), MaxRingSize: uint64(newMaxRingSize)}, + }); err != nil { + t.Fatalf("UpdateClientConnState returned err: %v", err) + } + + var ring2 *ring + select { + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout when waiting for a picker update after a configuration update") + case p2 := <-cc.NewPickerCh: + ring2 = p2.(*picker).ring + } + if ringSize := len(ring2.items); ringSize < newMinRingSize || ringSize > newMaxRingSize { + t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, newMinRingSize, newMaxRingSize) + } +} + func (s) TestOneSubConn(t *testing.T) { wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0]} cc, b, p0 := setupTest(t, []resolver.Address{wantAddr1}) @@ -320,7 +354,7 @@ func (s) TestAddrWeightChange(t *testing.T) { if err := b.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: wantAddrs}, - BalancerConfig: nil, + BalancerConfig: testConfig, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err) } @@ -336,7 +370,7 @@ func (s) TestAddrWeightChange(t *testing.T) { {Addr: testBackendAddrStrs[0]}, {Addr: testBackendAddrStrs[1]}, }}, - BalancerConfig: nil, + BalancerConfig: testConfig, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err) } @@ -359,7 +393,7 @@ func (s) TestAddrWeightChange(t *testing.T) { resolver.Address{Addr: testBackendAddrStrs[1]}, weightedroundrobin.AddrInfo{Weight: 2}), }}, - BalancerConfig: nil, + BalancerConfig: testConfig, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err) } @@ -505,7 +539,7 @@ func (s) TestAddrBalancerAttributesChange(t *testing.T) { addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})} if err := b.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: addrs2}, - BalancerConfig: nil, + BalancerConfig: testConfig, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err) }