Skip to content

Commit 00514a7

Browse files
authored
xds/clusterimpl: update UpdateClientConnState to handle updates synchronously (#7533)
1 parent 093e099 commit 00514a7

File tree

1 file changed

+87
-119
lines changed

1 file changed

+87
-119
lines changed

xds/internal/balancer/clusterimpl/clusterimpl.go

Lines changed: 87 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
package clusterimpl
2525

2626
import (
27+
"context"
2728
"encoding/json"
2829
"fmt"
2930
"sync"
@@ -33,7 +34,6 @@ import (
3334
"google.golang.org/grpc/connectivity"
3435
"google.golang.org/grpc/internal"
3536
"google.golang.org/grpc/internal/balancer/gracefulswitch"
36-
"google.golang.org/grpc/internal/buffer"
3737
"google.golang.org/grpc/internal/grpclog"
3838
"google.golang.org/grpc/internal/grpcsync"
3939
"google.golang.org/grpc/internal/pretty"
@@ -53,7 +53,10 @@ const (
5353
defaultRequestCountMax = 1024
5454
)
5555

56-
var connectedAddress = internal.ConnectedAddress.(func(balancer.SubConnState) resolver.Address)
56+
var (
57+
connectedAddress = internal.ConnectedAddress.(func(balancer.SubConnState) resolver.Address)
58+
errBalancerClosed = fmt.Errorf("%s LB policy is closed", Name)
59+
)
5760

5861
func init() {
5962
balancer.Register(bb{})
@@ -62,18 +65,17 @@ func init() {
6265
type bb struct{}
6366

6467
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
68+
ctx, cancel := context.WithCancel(context.Background())
6569
b := &clusterImplBalancer{
66-
ClientConn: cc,
67-
bOpts: bOpts,
68-
closed: grpcsync.NewEvent(),
69-
done: grpcsync.NewEvent(),
70-
loadWrapper: loadstore.NewWrapper(),
71-
pickerUpdateCh: buffer.NewUnbounded(),
72-
requestCountMax: defaultRequestCountMax,
70+
ClientConn: cc,
71+
bOpts: bOpts,
72+
loadWrapper: loadstore.NewWrapper(),
73+
requestCountMax: defaultRequestCountMax,
74+
serializer: grpcsync.NewCallbackSerializer(ctx),
75+
serializerCancel: cancel,
7376
}
7477
b.logger = prefixLogger(b)
7578
b.child = gracefulswitch.NewBalancer(b, bOpts)
76-
go b.run()
7779
b.logger.Infof("Created")
7880
return b
7981
}
@@ -89,18 +91,6 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
8991
type clusterImplBalancer struct {
9092
balancer.ClientConn
9193

92-
// mu guarantees mutual exclusion between Close() and handling of picker
93-
// update to the parent ClientConn in run(). It's to make sure that the
94-
// run() goroutine doesn't send picker update to parent after the balancer
95-
// is closed.
96-
//
97-
// It's only used by the run() goroutine, but not the other exported
98-
// functions. Because the exported functions are guaranteed to be
99-
// synchronized with Close().
100-
mu sync.Mutex
101-
closed *grpcsync.Event
102-
done *grpcsync.Event
103-
10494
bOpts balancer.BuildOptions
10595
logger *grpclog.PrefixLogger
10696
xdsClient xdsclient.XDSClient
@@ -115,10 +105,11 @@ type clusterImplBalancer struct {
115105
clusterNameMu sync.Mutex
116106
clusterName string
117107

108+
serializer *grpcsync.CallbackSerializer
109+
serializerCancel context.CancelFunc
110+
118111
// childState/drops/requestCounter keeps the state used by the most recently
119-
// generated picker. All fields can only be accessed in run(). And run() is
120-
// the only goroutine that sends picker to the parent ClientConn. All
121-
// requests to update picker need to be sent to pickerUpdateCh.
112+
// generated picker.
122113
childState balancer.State
123114
dropCategories []DropConfig // The categories for drops.
124115
drops []*dropper
@@ -127,7 +118,6 @@ type clusterImplBalancer struct {
127118
requestCounter *xdsclient.ClusterRequestsCounter
128119
requestCountMax uint32
129120
telemetryLabels map[string]string
130-
pickerUpdateCh *buffer.Unbounded
131121
}
132122

133123
// updateLoadStore checks the config for load store, and decides whether it
@@ -208,14 +198,9 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
208198
return nil
209199
}
210200

211-
func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
212-
if b.closed.HasFired() {
213-
b.logger.Warningf("xds: received ClientConnState {%+v} after clusterImplBalancer was closed", s)
214-
return nil
215-
}
216-
201+
func (b *clusterImplBalancer) updateClientConnState(s balancer.ClientConnState) error {
217202
if b.logger.V(2) {
218-
b.logger.Infof("Received update from resolver, balancer config: %s", pretty.ToJSON(s.BalancerConfig))
203+
b.logger.Infof("Received configuration: %s", pretty.ToJSON(s.BalancerConfig))
219204
}
220205
newConfig, ok := s.BalancerConfig.(*LBConfig)
221206
if !ok {
@@ -227,7 +212,7 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
227212
// it.
228213
bb := balancer.Get(newConfig.ChildPolicy.Name)
229214
if bb == nil {
230-
return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
215+
return fmt.Errorf("child policy %q not registered", newConfig.ChildPolicy.Name)
231216
}
232217

233218
if b.xdsClient == nil {
@@ -253,9 +238,14 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
253238
}
254239
b.config = newConfig
255240

256-
// Notify run() of this new config, in case drop and request counter need
257-
// update (which means a new picker needs to be generated).
258-
b.pickerUpdateCh.Put(newConfig)
241+
b.telemetryLabels = newConfig.TelemetryLabels
242+
dc := b.handleDropAndRequestCount(newConfig)
243+
if dc != nil && b.childState.Picker != nil {
244+
b.ClientConn.UpdateState(balancer.State{
245+
ConnectivityState: b.childState.ConnectivityState,
246+
Picker: b.newPicker(dc),
247+
})
248+
}
259249

260250
// Addresses and sub-balancer config are sent to sub-balancer.
261251
return b.child.UpdateClientConnState(balancer.ClientConnState{
@@ -264,20 +254,28 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
264254
})
265255
}
266256

267-
func (b *clusterImplBalancer) ResolverError(err error) {
268-
if b.closed.HasFired() {
269-
b.logger.Warningf("xds: received resolver error {%+v} after clusterImplBalancer was closed", err)
270-
return
257+
func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
258+
// Handle the update in a blocking fashion.
259+
errCh := make(chan error, 1)
260+
callback := func(context.Context) {
261+
errCh <- b.updateClientConnState(s)
271262
}
272-
b.child.ResolverError(err)
263+
onFailure := func() {
264+
// An attempt to schedule callback fails only when an update is received
265+
// after Close().
266+
errCh <- errBalancerClosed
267+
}
268+
b.serializer.ScheduleOr(callback, onFailure)
269+
return <-errCh
273270
}
274271

275-
func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer.SubConnState, cb func(balancer.SubConnState)) {
276-
if b.closed.HasFired() {
277-
b.logger.Warningf("xds: received subconn state change {%+v, %+v} after clusterImplBalancer was closed", sc, s)
278-
return
279-
}
272+
func (b *clusterImplBalancer) ResolverError(err error) {
273+
b.serializer.TrySchedule(func(context.Context) {
274+
b.child.ResolverError(err)
275+
})
276+
}
280277

278+
func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer.SubConnState, cb func(balancer.SubConnState)) {
281279
// Trigger re-resolution when a SubConn turns transient failure. This is
282280
// necessary for the LogicalDNS in cluster_resolver policy to re-resolve.
283281
//
@@ -299,26 +297,40 @@ func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer
299297
}
300298

301299
func (b *clusterImplBalancer) Close() {
302-
b.mu.Lock()
303-
b.closed.Fire()
304-
b.mu.Unlock()
305-
306-
b.child.Close()
307-
b.childState = balancer.State{}
308-
b.pickerUpdateCh.Close()
309-
<-b.done.Done()
310-
b.logger.Infof("Shutdown")
300+
b.serializer.TrySchedule(func(ctx context.Context) {
301+
b.child.Close()
302+
b.childState = balancer.State{}
303+
304+
if b.cancelLoadReport != nil {
305+
b.cancelLoadReport()
306+
b.cancelLoadReport = nil
307+
}
308+
b.logger.Infof("Shutdown")
309+
})
310+
b.serializerCancel()
311+
<-b.serializer.Done()
311312
}
312313

313314
func (b *clusterImplBalancer) ExitIdle() {
314-
b.child.ExitIdle()
315+
b.serializer.TrySchedule(func(context.Context) {
316+
b.child.ExitIdle()
317+
})
315318
}
316319

317320
// Override methods to accept updates from the child LB.
318321

319322
func (b *clusterImplBalancer) UpdateState(state balancer.State) {
320-
// Instead of updating parent ClientConn inline, send state to run().
321-
b.pickerUpdateCh.Put(state)
323+
b.serializer.TrySchedule(func(context.Context) {
324+
b.childState = state
325+
b.ClientConn.UpdateState(balancer.State{
326+
ConnectivityState: b.childState.ConnectivityState,
327+
Picker: b.newPicker(&dropConfigs{
328+
drops: b.drops,
329+
requestCounter: b.requestCounter,
330+
requestCountMax: b.requestCountMax,
331+
}),
332+
})
333+
})
322334
}
323335

324336
func (b *clusterImplBalancer) setClusterName(n string) {
@@ -370,21 +382,23 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer
370382
scw := &scWrapper{}
371383
oldListener := opts.StateListener
372384
opts.StateListener = func(state balancer.SubConnState) {
373-
b.updateSubConnState(sc, state, oldListener)
374-
if state.ConnectivityState != connectivity.Ready {
375-
return
376-
}
377-
// Read connected address and call updateLocalityID() based on the connected
378-
// address's locality. https://github.com/grpc/grpc-go/issues/7339
379-
addr := connectedAddress(state)
380-
lID := xdsinternal.GetLocalityID(addr)
381-
if lID.Empty() {
382-
if b.logger.V(2) {
383-
b.logger.Infof("Locality ID for %s unexpectedly empty", addr)
385+
b.serializer.TrySchedule(func(context.Context) {
386+
b.updateSubConnState(sc, state, oldListener)
387+
if state.ConnectivityState != connectivity.Ready {
388+
return
384389
}
385-
return
386-
}
387-
scw.updateLocalityID(lID)
390+
// Read connected address and call updateLocalityID() based on the connected
391+
// address's locality. https://github.com/grpc/grpc-go/issues/7339
392+
addr := connectedAddress(state)
393+
lID := xdsinternal.GetLocalityID(addr)
394+
if lID.Empty() {
395+
if b.logger.V(2) {
396+
b.logger.Infof("Locality ID for %s unexpectedly empty", addr)
397+
}
398+
return
399+
}
400+
scw.updateLocalityID(lID)
401+
})
388402
}
389403
sc, err := b.ClientConn.NewSubConn(newAddrs, opts)
390404
if err != nil {
@@ -464,49 +478,3 @@ func (b *clusterImplBalancer) handleDropAndRequestCount(newConfig *LBConfig) *dr
464478
requestCountMax: b.requestCountMax,
465479
}
466480
}
467-
468-
func (b *clusterImplBalancer) run() {
469-
defer b.done.Fire()
470-
for {
471-
select {
472-
case update, ok := <-b.pickerUpdateCh.Get():
473-
if !ok {
474-
return
475-
}
476-
b.pickerUpdateCh.Load()
477-
b.mu.Lock()
478-
if b.closed.HasFired() {
479-
b.mu.Unlock()
480-
return
481-
}
482-
switch u := update.(type) {
483-
case balancer.State:
484-
b.childState = u
485-
b.ClientConn.UpdateState(balancer.State{
486-
ConnectivityState: b.childState.ConnectivityState,
487-
Picker: b.newPicker(&dropConfigs{
488-
drops: b.drops,
489-
requestCounter: b.requestCounter,
490-
requestCountMax: b.requestCountMax,
491-
}),
492-
})
493-
case *LBConfig:
494-
b.telemetryLabels = u.TelemetryLabels
495-
dc := b.handleDropAndRequestCount(u)
496-
if dc != nil && b.childState.Picker != nil {
497-
b.ClientConn.UpdateState(balancer.State{
498-
ConnectivityState: b.childState.ConnectivityState,
499-
Picker: b.newPicker(dc),
500-
})
501-
}
502-
}
503-
b.mu.Unlock()
504-
case <-b.closed.Done():
505-
if b.cancelLoadReport != nil {
506-
b.cancelLoadReport()
507-
b.cancelLoadReport = nil
508-
}
509-
return
510-
}
511-
}
512-
}

0 commit comments

Comments
 (0)