Skip to content

Commit fddce60

Browse files
committed
Merge branch 'master' of https://github.com/pvsravani/grpc-go into stubserver-test
2 parents 29216bd + 79b6830 commit fddce60

File tree

84 files changed

+3283
-1060
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+3283
-1060
lines changed

balancer/endpointsharding/endpointsharding.go

Lines changed: 26 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -26,37 +26,17 @@
2626
package endpointsharding
2727

2828
import (
29-
"encoding/json"
3029
"errors"
31-
"fmt"
3230
rand "math/rand/v2"
3331
"sync"
3432
"sync/atomic"
3533

3634
"google.golang.org/grpc/balancer"
3735
"google.golang.org/grpc/balancer/base"
38-
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
3936
"google.golang.org/grpc/connectivity"
40-
"google.golang.org/grpc/grpclog"
41-
"google.golang.org/grpc/internal/balancer/gracefulswitch"
4237
"google.golang.org/grpc/resolver"
43-
"google.golang.org/grpc/serviceconfig"
4438
)
4539

46-
var (
47-
// PickFirstConfig is a pick first config without shuffling enabled.
48-
PickFirstConfig serviceconfig.LoadBalancingConfig
49-
logger = grpclog.Component("endpoint-sharding")
50-
)
51-
52-
func init() {
53-
var err error
54-
PickFirstConfig, err = ParseConfig(json.RawMessage(fmt.Sprintf("[{%q: {}}]", pickfirstleaf.Name)))
55-
if err != nil {
56-
logger.Fatal(err)
57-
}
58-
}
59-
6040
// ChildState is the balancer state of a child along with the endpoint which
6141
// identifies the child balancer.
6242
type ChildState struct {
@@ -68,26 +48,30 @@ type ChildState struct {
6848
Balancer balancer.ExitIdler
6949
}
7050

71-
// NewBalancer returns a load balancing policy that manages homogeneous child
72-
// policies each owning a single endpoint. The balancer will automatically call
73-
// ExitIdle on its children if they report IDLE connectivity state.
74-
func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
75-
return newBlanacer(cc, opts, true)
51+
// Options are the options to configure the behaviour of the
52+
// endpointsharding balancer.
53+
type Options struct {
54+
// DisableAutoReconnect allows the balancer to keep child balancer in the
55+
// IDLE state until they are explicitly triggered to exit using the
56+
// ChildState obtained from the endpointsharding picker. When set to false,
57+
// the endpointsharding balancer will automatically call ExitIdle on child
58+
// connections that report IDLE.
59+
DisableAutoReconnect bool
7660
}
7761

78-
// NewBalancerWithoutAutoReconnect returns a load balancing policy that manages
79-
// homogeneous child policies each owning a single endpoint. The balancer will
80-
// allow children to remain in IDLE state until triggered to exit idle state
81-
// using the ChildState obtained using the endpointsharding picker.
82-
func NewBalancerWithoutAutoReconnect(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
83-
return newBlanacer(cc, opts, false)
84-
}
62+
// ChildBuilderFunc creates a new balancer with the ClientConn. It has the same
63+
// type as the balancer.Builder.Build method.
64+
type ChildBuilderFunc func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer
8565

86-
func newBlanacer(cc balancer.ClientConn, opts balancer.BuildOptions, autoReconnect bool) balancer.Balancer {
66+
// NewBalancer returns a load balancing policy that manages homogeneous child
67+
// policies each owning a single endpoint. The endpointsharding balancer
68+
// forwards the LoadBalancingConfig in ClientConn state updates to its children.
69+
func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions, childBuilder ChildBuilderFunc, esOpts Options) balancer.Balancer {
8770
es := &endpointSharding{
88-
cc: cc,
89-
bOpts: opts,
90-
enableAutoReconnect: autoReconnect,
71+
cc: cc,
72+
bOpts: opts,
73+
esOpts: esOpts,
74+
childBuilder: childBuilder,
9175
}
9276
es.children.Store(resolver.NewEndpointMap())
9377
return es
@@ -97,9 +81,10 @@ func newBlanacer(cc balancer.ClientConn, opts balancer.BuildOptions, autoReconne
9781
// balancer with child config for every unique Endpoint received. It updates the
9882
// child states on any update from parent or child.
9983
type endpointSharding struct {
100-
cc balancer.ClientConn
101-
bOpts balancer.BuildOptions
102-
enableAutoReconnect bool
84+
cc balancer.ClientConn
85+
bOpts balancer.BuildOptions
86+
esOpts Options
87+
childBuilder ChildBuilderFunc
10388

10489
// childMu synchronizes calls to any single child. It must be held for all
10590
// calls into a child. To avoid deadlocks, do not acquire childMu while
@@ -160,7 +145,7 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState
160145
es: es,
161146
}
162147
childBalancer.childState.Balancer = childBalancer
163-
childBalancer.child = gracefulswitch.NewBalancer(childBalancer, es.bOpts)
148+
childBalancer.child = es.childBuilder(childBalancer, es.bOpts)
164149
}
165150
newChildren.Set(endpoint, childBalancer)
166151
if err := childBalancer.updateClientConnStateLocked(balancer.ClientConnState{
@@ -334,7 +319,7 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) {
334319
bw.es.mu.Lock()
335320
bw.childState.State = state
336321
bw.es.mu.Unlock()
337-
if state.ConnectivityState == connectivity.Idle && bw.es.enableAutoReconnect {
322+
if state.ConnectivityState == connectivity.Idle && !bw.es.esOpts.DisableAutoReconnect {
338323
bw.ExitIdle()
339324
}
340325
bw.es.updateState()
@@ -370,12 +355,3 @@ func (bw *balancerWrapper) closeLocked() {
370355
bw.child.Close()
371356
bw.isClosed = true
372357
}
373-
374-
// ParseConfig parses a child config list and returns an LB config to use with
375-
// the endpointsharding balancer.
376-
//
377-
// cfg is expected to be a JSON array of LB policy names + configs as the
378-
// format of the loadBalancingConfig field in ServiceConfig.
379-
func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
380-
return gracefulswitch.ParseConfig(cfg)
381-
}

balancer/endpointsharding/endpointsharding_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"google.golang.org/grpc"
3131
"google.golang.org/grpc/balancer"
3232
"google.golang.org/grpc/balancer/endpointsharding"
33+
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
3334
"google.golang.org/grpc/codes"
3435
"google.golang.org/grpc/credentials/insecure"
3536
"google.golang.org/grpc/grpclog"
@@ -80,7 +81,7 @@ func (fakePetioleBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptio
8081
ClientConn: cc,
8182
bOpts: opts,
8283
}
83-
fp.Balancer = endpointsharding.NewBalancer(fp, opts)
84+
fp.Balancer = endpointsharding.NewBalancer(fp, opts, balancer.Get(pickfirstleaf.Name).Build, endpointsharding.Options{})
8485
return fp
8586
}
8687

@@ -103,10 +104,7 @@ func (fp *fakePetiole) UpdateClientConnState(state balancer.ClientConnState) err
103104
return fmt.Errorf("UpdateClientConnState wants two endpoints, got: %v", el)
104105
}
105106

106-
return fp.Balancer.UpdateClientConnState(balancer.ClientConnState{
107-
BalancerConfig: endpointsharding.PickFirstConfig,
108-
ResolverState: state.ResolverState,
109-
})
107+
return fp.Balancer.UpdateClientConnState(state)
110108
}
111109

112110
func (fp *fakePetiole) UpdateState(state balancer.State) {
@@ -182,13 +180,11 @@ func (s) TestEndpointShardingReconnectDisabled(t *testing.T) {
182180
name := strings.ReplaceAll(strings.ToLower(t.Name()), "/", "")
183181
bf := stub.BalancerFuncs{
184182
Init: func(bd *stub.BalancerData) {
185-
bd.Data = endpointsharding.NewBalancerWithoutAutoReconnect(bd.ClientConn, bd.BuildOptions)
183+
epOpts := endpointsharding.Options{DisableAutoReconnect: true}
184+
bd.Data = endpointsharding.NewBalancer(bd.ClientConn, bd.BuildOptions, balancer.Get(pickfirstleaf.Name).Build, epOpts)
186185
},
187186
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
188-
return bd.Data.(balancer.Balancer).UpdateClientConnState(balancer.ClientConnState{
189-
BalancerConfig: endpointsharding.PickFirstConfig,
190-
ResolverState: ccs.ResolverState,
191-
})
187+
return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs)
192188
},
193189
Close: func(bd *stub.BalancerData) {
194190
bd.Data.(balancer.Balancer).Close()

balancer/lazy/lazy.go

Lines changed: 9 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -26,44 +26,35 @@
2626
package lazy
2727

2828
import (
29-
"encoding/json"
3029
"fmt"
3130
"sync"
3231

3332
"google.golang.org/grpc/balancer"
34-
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
3533
"google.golang.org/grpc/connectivity"
3634
"google.golang.org/grpc/grpclog"
37-
"google.golang.org/grpc/internal/balancer/gracefulswitch"
3835
"google.golang.org/grpc/resolver"
39-
"google.golang.org/grpc/serviceconfig"
4036

4137
internalgrpclog "google.golang.org/grpc/internal/grpclog"
4238
)
4339

44-
func init() {
45-
balancer.Register(builder{})
46-
}
47-
4840
var (
49-
// PickfirstConfig is the LB policy config json for a pick_first load
50-
// balancer that is lazily initialized.
51-
PickfirstConfig = fmt.Sprintf("{\"childPolicy\": [{%q: {}}]}", pickfirstleaf.Name)
52-
logger = grpclog.Component("lazy-lb")
41+
logger = grpclog.Component("lazy-lb")
5342
)
5443

5544
const (
56-
// Name is the name of the lazy balancer.
57-
Name = "lazy"
5845
logPrefix = "[lazy-lb %p] "
5946
)
6047

61-
type builder struct{}
48+
// ChildBuilderFunc creates a new balancer with the ClientConn. It has the same
49+
// type as the balancer.Builder.Build method.
50+
type ChildBuilderFunc func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer
6251

63-
func (builder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
52+
// NewBalancer is the constructor for the lazy balancer.
53+
func NewBalancer(cc balancer.ClientConn, bOpts balancer.BuildOptions, childBuilder ChildBuilderFunc) balancer.Balancer {
6454
b := &lazyBalancer{
6555
cc: cc,
6656
buildOptions: bOpts,
57+
childBuilder: childBuilder,
6758
}
6859
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
6960
cc.UpdateState(balancer.State{
@@ -77,16 +68,13 @@ func (builder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balanc
7768
return b
7869
}
7970

80-
func (builder) Name() string {
81-
return Name
82-
}
83-
8471
type lazyBalancer struct {
8572
// The following fields are initialized at build time and read-only after
8673
// that and therefore do not need to be guarded by a mutex.
8774
cc balancer.ClientConn
8875
buildOptions balancer.BuildOptions
8976
logger *internalgrpclog.PrefixLogger
77+
childBuilder ChildBuilderFunc
9078

9179
// The following fields are accessed while handling calls to the idlePicker
9280
// and when handling ClientConn state updates. They are guarded by a mutex.
@@ -119,12 +107,6 @@ func (lb *lazyBalancer) ResolverError(err error) {
119107
func (lb *lazyBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
120108
lb.mu.Lock()
121109
defer lb.mu.Unlock()
122-
childLBCfg, ok := ccs.BalancerConfig.(lbCfg)
123-
if !ok {
124-
lb.logger.Errorf("Got LB config of unexpected type: %v", ccs.BalancerConfig)
125-
return balancer.ErrBadResolverState
126-
}
127-
ccs.BalancerConfig = childLBCfg.childLBCfg
128110
if lb.delegate != nil {
129111
return lb.delegate.UpdateClientConnState(ccs)
130112
}
@@ -148,7 +130,7 @@ func (lb *lazyBalancer) ExitIdle() {
148130
}
149131
return
150132
}
151-
lb.delegate = gracefulswitch.NewBalancer(lb.cc, lb.buildOptions)
133+
lb.delegate = lb.childBuilder(lb.cc, lb.buildOptions)
152134
if lb.latestClientConnState != nil {
153135
if err := lb.delegate.UpdateClientConnState(*lb.latestClientConnState); err != nil {
154136
if err == balancer.ErrBadResolverState {
@@ -165,25 +147,6 @@ func (lb *lazyBalancer) ExitIdle() {
165147
}
166148
}
167149

168-
type lbCfg struct {
169-
serviceconfig.LoadBalancingConfig
170-
childLBCfg serviceconfig.LoadBalancingConfig
171-
}
172-
173-
func (b builder) ParseConfig(lbConfig json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
174-
jsonReprsentation := &struct {
175-
ChildPolicy json.RawMessage
176-
}{}
177-
if err := json.Unmarshal(lbConfig, jsonReprsentation); err != nil {
178-
return nil, err
179-
}
180-
childCfg, err := gracefulswitch.ParseConfig(jsonReprsentation.ChildPolicy)
181-
if err != nil {
182-
return nil, err
183-
}
184-
return lbCfg{childLBCfg: childCfg}, nil
185-
}
186-
187150
// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
188151
// CONNECTING when Pick is called.
189152
type idlePicker struct {

0 commit comments

Comments
 (0)