Skip to content

Commit 3c3f093

Browse files
committed
rework for A68
1 parent e8ff4a1 commit 3c3f093

File tree

2 files changed

+134
-60
lines changed

2 files changed

+134
-60
lines changed

balancer/randomsubsetting/randomsubsetting.go

Lines changed: 55 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
package randomsubsetting
2525

2626
import (
27+
"cmp"
2728
"encoding/json"
28-
"errors"
2929
"fmt"
30-
"sort"
30+
"slices"
3131
"time"
3232

3333
"github.com/cespare/xxhash/v2"
@@ -72,28 +72,38 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
7272
return b
7373
}
7474

75-
// LBConfig is the config for the outlier detection balancer.
75+
// LBConfig is the config for the random subsetting balancer.
7676
type LBConfig struct {
7777
serviceconfig.LoadBalancingConfig `json:"-"`
7878

79-
SubsetSize uint64 `json:"subset_size,omitempty"`
80-
79+
SubsetSize uint64 `json:"subset_size,omitempty"`
8180
ChildPolicy *iserviceconfig.BalancerConfig `json:"child_policy,omitempty"`
8281
}
8382

8483
func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
8584
lbCfg := &LBConfig{
86-
// Default top layer values.
87-
SubsetSize: 10,
85+
SubsetSize: 2, // default value
86+
ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"},
8887
}
8988

9089
if err := json.Unmarshal(s, lbCfg); err != nil { // Validates child config if present as well.
91-
return nil, fmt.Errorf("subsetting: unable to unmarshal LBconfig: %s, error: %v", string(s), err)
90+
return nil, fmt.Errorf("randomsubsetting: unable to unmarshal LBConfig: %s, error: %v", string(s), err)
9291
}
9392

94-
// if someonw needs subsetSize == 1, he should use pick_first instead
93+
// if someone needs SubsetSize == 1, he should use pick_first instead
9594
if lbCfg.SubsetSize < 2 {
96-
return nil, errors.New("subsetting: subsetSize must be >= 2")
95+
return nil, fmt.Errorf("randomsubsetting: SubsetSize must be >= 2")
96+
}
97+
98+
if lbCfg.ChildPolicy == nil {
99+
return nil, fmt.Errorf("randomsubsetting: child policy field must be set")
100+
}
101+
102+
// Reject whole config if child policy doesn't exist, don't persist it for
103+
// later.
104+
bb := balancer.Get(lbCfg.ChildPolicy.Name)
105+
if bb == nil {
106+
return nil, fmt.Errorf("randomsubsetting: child balancer %q not registered", lbCfg.ChildPolicy.Name)
97107
}
98108

99109
return lbCfg, nil
@@ -114,73 +124,68 @@ type subsettingBalancer struct {
114124
func (b *subsettingBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
115125
lbCfg, ok := s.BalancerConfig.(*LBConfig)
116126
if !ok {
117-
b.logger.Errorf("received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
127+
b.logger.Errorf("randomsubsetting: received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
118128
return balancer.ErrBadResolverState
119129
}
120130

121-
// Reject whole config if child policy doesn't exist, don't persist it for
122-
// later.
123-
bb := balancer.Get(lbCfg.ChildPolicy.Name)
124-
if bb == nil {
125-
return fmt.Errorf("subsetting: child balancer %q not registered", lbCfg.ChildPolicy.Name)
126-
}
127-
128131
if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name {
129-
err := b.child.SwitchTo(bb)
130-
if err != nil {
131-
return fmt.Errorf("subsetting: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err)
132+
133+
if err := b.child.SwitchTo(balancer.Get(lbCfg.ChildPolicy.Name)); err != nil {
134+
return fmt.Errorf("randomsubsetting: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err)
132135
}
133136
}
134137
b.cfg = lbCfg
135138

136-
err := b.child.UpdateClientConnState(balancer.ClientConnState{
137-
ResolverState: b.prepareChildResolverState(s.ResolverState),
139+
return b.child.UpdateClientConnState(balancer.ClientConnState{
140+
ResolverState: b.prepareChildResolverState(s),
138141
BalancerConfig: b.cfg.ChildPolicy.Config,
139142
})
140-
141-
return err
142143
}
143144

144-
type AddressWithHash struct {
145+
type endpointWithHash struct {
145146
hash uint64
146-
addr resolver.Address
147+
ep resolver.Endpoint
147148
}
148149

149-
// implements the subsetting algorithm, as described in A68: https://github.com/grpc/proposal/pull/423
150-
func (b *subsettingBalancer) prepareChildResolverState(s resolver.State) resolver.State {
151-
addresses := s.Addresses
152-
backendCount := len(addresses)
153-
if backendCount <= int(b.cfg.SubsetSize) {
154-
return s
150+
// implements the subsetting algorithm,
151+
// as described in A68: https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md
152+
func (b *subsettingBalancer) prepareChildResolverState(s balancer.ClientConnState) resolver.State {
153+
subsetSize := b.cfg.SubsetSize
154+
endPoints := s.ResolverState.Endpoints
155+
backendCount := len(endPoints)
156+
if backendCount <= int(subsetSize) || subsetSize < 2 {
157+
return s.ResolverState
155158
}
156159

157-
addressesSet := make([]AddressWithHash, backendCount)
158160
// calculate hash for each endpoint
159-
for i, endpoint := range addresses {
160-
161-
b.hashf.Write([]byte(s.Addresses[0].String()))
162-
addressesSet[i] = AddressWithHash{
161+
endpointSet := make([]endpointWithHash, backendCount)
162+
for i, endpoint := range endPoints {
163+
b.hashf.Write([]byte(endpoint.Addresses[0].String()))
164+
endpointSet[i] = endpointWithHash{
163165
hash: b.hashf.Sum64(),
164-
addr: endpoint,
166+
ep: endpoint,
165167
}
166168
}
167-
// sort addresses by hash
168-
sort.Slice(addressesSet, func(i, j int) bool {
169-
return addressesSet[i].hash < addressesSet[j].hash
169+
170+
// sort endpoint by hash
171+
slices.SortFunc(endpointSet, func(a, b endpointWithHash) int {
172+
return cmp.Compare(a.hash, b.hash)
170173
})
171174

172-
b.logger.Infof("resulting subset: %v", addressesSet[:b.cfg.SubsetSize])
175+
if b.logger.V(2) {
176+
b.logger.Infof("randomsubsetting: resulting subset: %v", endpointSet[:subsetSize])
177+
}
173178

174-
// Convert back to resolver.addresses
175-
addressesSubset := make([]resolver.Address, b.cfg.SubsetSize)
176-
for _, eh := range addressesSet[:b.cfg.SubsetSize] {
177-
addressesSubset = append(addressesSubset, eh.addr)
179+
// Convert back to resolver.Endpoints
180+
endpointSubset := make([]resolver.Endpoint, subsetSize)
181+
for i, endpoint := range endpointSet[:subsetSize] {
182+
endpointSubset[i] = endpoint.ep
178183
}
179184

180185
return resolver.State{
181-
Addresses: addressesSubset,
182-
ServiceConfig: s.ServiceConfig,
183-
Attributes: s.Attributes,
186+
Endpoints: endpointSubset,
187+
ServiceConfig: s.ResolverState.ServiceConfig,
188+
Attributes: s.ResolverState.Attributes,
184189
}
185190
}
186191

balancer/randomsubsetting/randomsubsetting_test.go

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717
*/
1818

1919
// Package e2e_test contains e2e test cases for the Subsetting LB Policy.
20-
package randomsubsetting_test
20+
package randomsubsetting
2121

2222
import (
2323
"context"
24+
"encoding/json"
2425
"fmt"
2526
"math"
27+
"strings"
2628
"testing"
2729
"time"
2830

31+
"github.com/google/go-cmp/cmp"
2932
"google.golang.org/grpc"
3033
"google.golang.org/grpc/credentials/insecure"
3134
"google.golang.org/grpc/internal"
@@ -36,13 +39,12 @@ import (
3639
"google.golang.org/grpc/resolver/manual"
3740
"google.golang.org/grpc/serviceconfig"
3841

42+
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
3943
testgrpc "google.golang.org/grpc/interop/grpc_testing"
4044
testpb "google.golang.org/grpc/interop/grpc_testing"
41-
42-
_ "google.golang.org/grpc/balancer/randomsubsetting"
4345
)
4446

45-
var defaultTestTimeout = 5 * time.Second
47+
var defaultTestTimeout = 120 * time.Second
4648

4749
type s struct {
4850
grpctest.Tester
@@ -52,6 +54,66 @@ func Test(t *testing.T) {
5254
grpctest.RunSubTests(t, s{})
5355
}
5456

57+
func (s) TestParseConfig(t *testing.T) {
58+
parser := bb{}
59+
tests := []struct {
60+
name string
61+
input string
62+
wantCfg serviceconfig.LoadBalancingConfig
63+
wantErr string
64+
}{
65+
{
66+
name: "happy-case-default",
67+
input: `{}`,
68+
wantCfg: &LBConfig{
69+
SubsetSize: 2,
70+
ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"},
71+
},
72+
},
73+
{
74+
name: "happy-case-subset_size-set",
75+
input: `{ "subset_size": 3 }`,
76+
wantCfg: &LBConfig{
77+
SubsetSize: 3,
78+
ChildPolicy: &iserviceconfig.BalancerConfig{Name: "round_robin"},
79+
},
80+
},
81+
{
82+
name: "subset_size-less-than-2",
83+
input: `{ "subset_size": 1,
84+
"child_policy": [{"round_robin": {}}]}`,
85+
wantErr: "randomsubsetting: SubsetSize must be >= 2",
86+
},
87+
{
88+
name: "invalid-json",
89+
input: "{{invalidjson{{",
90+
wantErr: "invalid character",
91+
},
92+
}
93+
for _, test := range tests {
94+
t.Run(test.name, func(t *testing.T) {
95+
gotCfg, gotErr := parser.ParseConfig(json.RawMessage(test.input))
96+
// Substring match makes this very tightly coupled to the
97+
// internalserviceconfig.BalancerConfig error strings. However, it
98+
// is important to distinguish the different types of error messages
99+
// possible as the parser has a few defined buckets of ways it can
100+
// error out.
101+
if (gotErr != nil) != (test.wantErr != "") {
102+
t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
103+
}
104+
if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) {
105+
t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
106+
}
107+
if test.wantErr != "" {
108+
return
109+
}
110+
if diff := cmp.Diff(gotCfg, test.wantCfg); diff != "" {
111+
t.Fatalf("ParseConfig(%v) got unexpected output, diff (-got +want): %v", test.input, diff)
112+
}
113+
})
114+
}
115+
}
116+
55117
func setupBackends(t *testing.T, backendsCount int) ([]resolver.Address, func()) {
56118
t.Helper()
57119

@@ -123,7 +185,7 @@ func setupClients(t *testing.T, clientsCount int, subsetSize int, addresses []re
123185
return clients, cancel
124186
}
125187

126-
func checkRoundRobinRPCs(t *testing.T, ctx context.Context, clients []testgrpc.TestServiceClient, subsetSize int, maxDiff int) {
188+
func checkRoundRobinRPCs(ctx context.Context, t *testing.T, clients []testgrpc.TestServiceClient, subsetSize int, maxDiff int) {
127189
clientsPerBackend := map[string]map[int]struct{}{}
128190

129191
for clientIdx, client := range clients {
@@ -176,33 +238,40 @@ func (s) TestSubsettingE2E(t *testing.T) {
176238
backends int
177239
maxDiff int
178240
}{
241+
{
242+
name: "backends could be evenly distributed between small number of clients",
243+
backends: 3,
244+
clients: 2,
245+
subsetSize: 2,
246+
maxDiff: 1,
247+
},
179248
{
180249
name: "backends could be evenly distributed between clients",
181250
backends: 12,
182251
clients: 8,
183252
subsetSize: 3,
184-
maxDiff: 0,
253+
maxDiff: 3,
185254
},
186255
{
187256
name: "backends could NOT be evenly distributed between clients",
188257
backends: 37,
189258
clients: 22,
190259
subsetSize: 5,
191-
maxDiff: 2,
260+
maxDiff: 15,
192261
},
193262
{
194263
name: "Nbackends %% subsetSize == 0, but there are not enough clients to fill the last round",
195264
backends: 20,
196265
clients: 7,
197266
subsetSize: 5,
198-
maxDiff: 1,
267+
maxDiff: 20,
199268
},
200269
{
201270
name: "last round is completely filled, but there are some excluded backends on every round",
202271
backends: 21,
203272
clients: 8,
204273
subsetSize: 5,
205-
maxDiff: 1,
274+
maxDiff: 3,
206275
},
207276
}
208277
for _, test := range tests {
@@ -216,7 +285,7 @@ func (s) TestSubsettingE2E(t *testing.T) {
216285
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
217286
defer cancel()
218287

219-
checkRoundRobinRPCs(t, ctx, clients, test.subsetSize, test.maxDiff)
288+
checkRoundRobinRPCs(ctx, t, clients, test.subsetSize, test.maxDiff)
220289
})
221290
}
222291
}

0 commit comments

Comments
 (0)