Skip to content

Commit 08ec446

Browse files
arjan-balpurnesh42H
authored andcommitted
balancergroup: Make closing terminal (grpc#8095)
1 parent aa10c9d commit 08ec446

File tree

6 files changed

+60
-117
lines changed

6 files changed

+60
-117
lines changed

balancer/rls/balancer.go

-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,6 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
148148
Logger: lb.logger,
149149
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
150150
})
151-
lb.bg.Start()
152151
go lb.run()
153152
return lb
154153
}

balancer/weightedtarget/weightedtarget.go

-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
6262
Logger: b.logger,
6363
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
6464
})
65-
b.bg.Start()
6665
b.logger.Infof("Created")
6766
return b
6867
}

internal/balancergroup/balancergroup.go

+58-63
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ type BalancerGroup struct {
194194
// The corresponding boolean outgoingStarted is used to stop further updates
195195
// to sub-balancers after they are closed.
196196
outgoingMu sync.Mutex
197-
outgoingStarted bool
197+
outgoingClosed bool
198198
idToBalancerConfig map[string]*subBalancerWrapper
199199
// Cache for sub-balancers when they are removed. This is `nil` if caching
200200
// is disabled by passing `0` for Options.SubBalancerCloseTimeout`.
@@ -224,7 +224,7 @@ type BalancerGroup struct {
224224
// The corresponding boolean incomingStarted is used to stop further updates
225225
// from sub-balancers after they are closed.
226226
incomingMu sync.Mutex
227-
incomingStarted bool // This boolean only guards calls back to ClientConn.
227+
incomingClosed bool // This boolean only guards calls back to ClientConn.
228228
scToSubBalancer map[balancer.SubConn]*subBalancerWrapper
229229
}
230230

@@ -265,30 +265,6 @@ func New(opts Options) *BalancerGroup {
265265
}
266266
}
267267

268-
// Start starts the balancer group, including building all the sub-balancers,
269-
// and send the existing addresses to them.
270-
//
271-
// A BalancerGroup can be closed and started later. When a BalancerGroup is
272-
// closed, it can still receive address updates, which will be applied when
273-
// restarted.
274-
func (bg *BalancerGroup) Start() {
275-
bg.incomingMu.Lock()
276-
bg.incomingStarted = true
277-
bg.incomingMu.Unlock()
278-
279-
bg.outgoingMu.Lock()
280-
if bg.outgoingStarted {
281-
bg.outgoingMu.Unlock()
282-
return
283-
}
284-
285-
for _, config := range bg.idToBalancerConfig {
286-
config.startBalancer()
287-
}
288-
bg.outgoingStarted = true
289-
bg.outgoingMu.Unlock()
290-
}
291-
292268
// AddWithClientConn adds a balancer with the given id to the group. The
293269
// balancer is built with a balancer builder registered with balancerName. The
294270
// given ClientConn is passed to the newly built balancer instead of the
@@ -299,17 +275,18 @@ func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.
299275
bg.logger.Infof("Adding child policy of type %q for child %q", balancerName, id)
300276
builder := balancer.Get(balancerName)
301277
if builder == nil {
302-
return fmt.Errorf("unregistered balancer name %q", balancerName)
278+
return fmt.Errorf("balancergroup: unregistered balancer name %q", balancerName)
303279
}
304280

305281
// Store data in static map, and then check to see if bg is started.
306282
bg.outgoingMu.Lock()
307283
defer bg.outgoingMu.Unlock()
284+
if bg.outgoingClosed {
285+
return fmt.Errorf("balancergroup: already closed")
286+
}
308287
var sbc *subBalancerWrapper
309-
// If outgoingStarted is true, search in the cache. Otherwise, cache is
310-
// guaranteed to be empty, searching is unnecessary. Also, skip the cache if
311-
// caching is disabled.
312-
if bg.outgoingStarted && bg.deletedBalancerCache != nil {
288+
// Skip searching the cache if disabled.
289+
if bg.deletedBalancerCache != nil {
313290
if old, ok := bg.deletedBalancerCache.Remove(id); ok {
314291
if bg.logger.V(2) {
315292
bg.logger.Infof("Removing and reusing child policy of type %q for child %q from the balancer cache", balancerName, id)
@@ -341,11 +318,7 @@ func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.
341318
builder: builder,
342319
buildOpts: bg.buildOpts,
343320
}
344-
if bg.outgoingStarted {
345-
// Only start the balancer if bg is started. Otherwise, we only keep the
346-
// static data.
347-
sbc.startBalancer()
348-
}
321+
sbc.startBalancer()
349322
} else {
350323
// When brining back a sub-balancer from cache, re-send the cached
351324
// picker and state.
@@ -369,6 +342,10 @@ func (bg *BalancerGroup) Remove(id string) {
369342
bg.logger.Infof("Removing child policy for child %q", id)
370343

371344
bg.outgoingMu.Lock()
345+
if bg.outgoingClosed {
346+
bg.outgoingMu.Unlock()
347+
return
348+
}
372349

373350
sbToRemove, ok := bg.idToBalancerConfig[id]
374351
if !ok {
@@ -379,12 +356,6 @@ func (bg *BalancerGroup) Remove(id string) {
379356

380357
// Unconditionally remove the sub-balancer config from the map.
381358
delete(bg.idToBalancerConfig, id)
382-
if !bg.outgoingStarted {
383-
// Nothing needs to be done here, since we wouldn't have created the
384-
// sub-balancer.
385-
bg.outgoingMu.Unlock()
386-
return
387-
}
388359

389360
if bg.deletedBalancerCache != nil {
390361
if bg.logger.V(2) {
@@ -424,6 +395,7 @@ func (bg *BalancerGroup) Remove(id string) {
424395
// cleanup after the timeout.
425396
func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWrapper) {
426397
bg.incomingMu.Lock()
398+
defer bg.incomingMu.Unlock()
427399
// Remove SubConns. This is only done after the balancer is
428400
// actually closed.
429401
//
@@ -437,18 +409,20 @@ func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWrapper) {
437409
delete(bg.scToSubBalancer, sc)
438410
}
439411
}
440-
bg.incomingMu.Unlock()
441412
}
442413

443414
// connect attempts to connect to all subConns belonging to sb.
444415
func (bg *BalancerGroup) connect(sb *subBalancerWrapper) {
445416
bg.incomingMu.Lock()
417+
defer bg.incomingMu.Unlock()
418+
if bg.incomingClosed {
419+
return
420+
}
446421
for sc, b := range bg.scToSubBalancer {
447422
if b == sb {
448423
sc.Connect()
449424
}
450425
}
451-
bg.incomingMu.Unlock()
452426
}
453427

454428
// Following are actions from the parent grpc.ClientConn, forward to sub-balancers.
@@ -457,6 +431,10 @@ func (bg *BalancerGroup) connect(sb *subBalancerWrapper) {
457431
// needed.
458432
func (bg *BalancerGroup) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {
459433
bg.incomingMu.Lock()
434+
if bg.incomingClosed {
435+
bg.incomingMu.Unlock()
436+
return
437+
}
460438
if _, ok := bg.scToSubBalancer[sc]; !ok {
461439
bg.incomingMu.Unlock()
462440
return
@@ -468,10 +446,13 @@ func (bg *BalancerGroup) updateSubConnState(sc balancer.SubConn, state balancer.
468446
bg.incomingMu.Unlock()
469447

470448
bg.outgoingMu.Lock()
449+
defer bg.outgoingMu.Unlock()
450+
if bg.outgoingClosed {
451+
return
452+
}
471453
if cb != nil {
472454
cb(state)
473455
}
474-
bg.outgoingMu.Unlock()
475456
}
476457

477458
// UpdateSubConnState handles the state for the subconn. It finds the
@@ -485,6 +466,9 @@ func (bg *BalancerGroup) UpdateSubConnState(sc balancer.SubConn, state balancer.
485466
func (bg *BalancerGroup) UpdateClientConnState(id string, s balancer.ClientConnState) error {
486467
bg.outgoingMu.Lock()
487468
defer bg.outgoingMu.Unlock()
469+
if bg.outgoingClosed {
470+
return nil
471+
}
488472
if config, ok := bg.idToBalancerConfig[id]; ok {
489473
return config.updateClientConnState(s)
490474
}
@@ -494,10 +478,13 @@ func (bg *BalancerGroup) UpdateClientConnState(id string, s balancer.ClientConnS
494478
// ResolverError forwards resolver errors to all sub-balancers.
495479
func (bg *BalancerGroup) ResolverError(err error) {
496480
bg.outgoingMu.Lock()
481+
defer bg.outgoingMu.Unlock()
482+
if bg.outgoingClosed {
483+
return
484+
}
497485
for _, config := range bg.idToBalancerConfig {
498486
config.resolverError(err)
499487
}
500-
bg.outgoingMu.Unlock()
501488
}
502489

503490
// Following are actions from sub-balancers, forward to ClientConn.
@@ -514,9 +501,9 @@ func (bg *BalancerGroup) newSubConn(config *subBalancerWrapper, addrs []resolver
514501
// error. But since we call balancer.stopBalancer when removing the balancer, this
515502
// shouldn't happen.
516503
bg.incomingMu.Lock()
517-
if !bg.incomingStarted {
504+
if bg.incomingClosed {
518505
bg.incomingMu.Unlock()
519-
return nil, fmt.Errorf("NewSubConn is called after balancer group is closed")
506+
return nil, fmt.Errorf("balancergroup: NewSubConn is called after balancer group is closed")
520507
}
521508
var sc balancer.SubConn
522509
oldListener := opts.StateListener
@@ -547,31 +534,33 @@ func (bg *BalancerGroup) updateBalancerState(id string, state balancer.State) {
547534
}
548535

549536
// Close closes the balancer. It stops sub-balancers, and removes the subconns.
550-
// The BalancerGroup can be restarted later.
537+
// When a BalancerGroup is closed, it can not receive further address updates.
551538
func (bg *BalancerGroup) Close() {
552539
bg.incomingMu.Lock()
553-
if bg.incomingStarted {
554-
bg.incomingStarted = false
555-
// Also remove all SubConns.
556-
for sc := range bg.scToSubBalancer {
557-
sc.Shutdown()
558-
delete(bg.scToSubBalancer, sc)
559-
}
540+
bg.incomingClosed = true
541+
// Also remove all SubConns.
542+
for sc := range bg.scToSubBalancer {
543+
sc.Shutdown()
544+
delete(bg.scToSubBalancer, sc)
560545
}
561546
bg.incomingMu.Unlock()
562547

548+
bg.outgoingMu.Lock()
549+
// Setting `outgoingClosed` ensures that no entries are added to
550+
// `deletedBalancerCache` after this point.
551+
bg.outgoingClosed = true
552+
bg.outgoingMu.Unlock()
553+
563554
// Clear(true) runs clear function to close sub-balancers in cache. It
564555
// must be called out of outgoing mutex.
565556
if bg.deletedBalancerCache != nil {
566557
bg.deletedBalancerCache.Clear(true)
567558
}
568559

569560
bg.outgoingMu.Lock()
570-
if bg.outgoingStarted {
571-
bg.outgoingStarted = false
572-
for _, config := range bg.idToBalancerConfig {
573-
config.stopBalancer()
574-
}
561+
for id, config := range bg.idToBalancerConfig {
562+
config.stopBalancer()
563+
delete(bg.idToBalancerConfig, id)
575564
}
576565
bg.outgoingMu.Unlock()
577566
}
@@ -581,24 +570,30 @@ func (bg *BalancerGroup) Close() {
581570
// not supported.
582571
func (bg *BalancerGroup) ExitIdle() {
583572
bg.outgoingMu.Lock()
573+
defer bg.outgoingMu.Unlock()
574+
if bg.outgoingClosed {
575+
return
576+
}
584577
for _, config := range bg.idToBalancerConfig {
585578
if !config.exitIdle() {
586579
bg.connect(config)
587580
}
588581
}
589-
bg.outgoingMu.Unlock()
590582
}
591583

592584
// ExitIdleOne instructs the sub-balancer `id` to exit IDLE state, if
593585
// appropriate and possible.
594586
func (bg *BalancerGroup) ExitIdleOne(id string) {
595587
bg.outgoingMu.Lock()
588+
defer bg.outgoingMu.Unlock()
589+
if bg.outgoingClosed {
590+
return
591+
}
596592
if config := bg.idToBalancerConfig[id]; config != nil {
597593
if !config.exitIdle() {
598594
bg.connect(config)
599595
}
600596
}
601-
bg.outgoingMu.Unlock()
602597
}
603598

604599
// ParseConfig parses a child config list and returns a LB config for the

0 commit comments

Comments
 (0)