Skip to content

Commit bb41067

Browse files
authored
balancergroup: do not cache closed sub-balancers by default (#6523)
1 parent 68704f8 commit bb41067

File tree

11 files changed

+173
-96
lines changed

11 files changed

+173
-96
lines changed

balancer/rls/balancer.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,13 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
104104
}
105105
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
106106
lb.dataCache = newDataCache(maxCacheSize, lb.logger)
107-
lb.bg = balancergroup.New(cc, opts, lb, lb.logger)
107+
lb.bg = balancergroup.New(balancergroup.Options{
108+
CC: cc,
109+
BuildOpts: opts,
110+
StateAggregator: lb,
111+
Logger: lb.logger,
112+
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
113+
})
108114
lb.bg.Start()
109115
go lb.run()
110116
return lb

balancer/rls/helpers_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"google.golang.org/grpc/balancer/rls/internal/test/e2e"
2929
"google.golang.org/grpc/codes"
3030
"google.golang.org/grpc/internal"
31-
"google.golang.org/grpc/internal/balancergroup"
3231
"google.golang.org/grpc/internal/grpcsync"
3332
"google.golang.org/grpc/internal/grpctest"
3433
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
@@ -48,10 +47,6 @@ const (
4847
defaultTestShortTimeout = 100 * time.Millisecond
4948
)
5049

51-
func init() {
52-
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
53-
}
54-
5550
type s struct {
5651
grpctest.Tester
5752
}

balancer/weightedtarget/weightedtarget.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ package weightedtarget
2424
import (
2525
"encoding/json"
2626
"fmt"
27+
"time"
2728

2829
"google.golang.org/grpc/balancer"
2930
"google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
@@ -54,7 +55,13 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
5455
b.logger = prefixLogger(b)
5556
b.stateAggregator = weightedaggregator.New(cc, b.logger, NewRandomWRR)
5657
b.stateAggregator.Start()
57-
b.bg = balancergroup.New(cc, bOpts, b.stateAggregator, b.logger)
58+
b.bg = balancergroup.New(balancergroup.Options{
59+
CC: cc,
60+
BuildOpts: bOpts,
61+
StateAggregator: b.stateAggregator,
62+
Logger: b.logger,
63+
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
64+
})
5865
b.bg.Start()
5966
b.logger.Infof("Created")
6067
return b

balancer/weightedtarget/weightedtarget_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"google.golang.org/grpc/balancer/roundrobin"
3434
"google.golang.org/grpc/connectivity"
3535
"google.golang.org/grpc/internal/balancer/stub"
36-
"google.golang.org/grpc/internal/balancergroup"
3736
"google.golang.org/grpc/internal/grpctest"
3837
"google.golang.org/grpc/internal/hierarchy"
3938
"google.golang.org/grpc/internal/testutils"
@@ -159,7 +158,6 @@ func init() {
159158
wtbBuilder = balancer.Get(Name)
160159
wtbParser = wtbBuilder.(balancer.ConfigParser)
161160

162-
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
163161
NewRandomWRR = testutils.NewTestWRR
164162
}
165163

internal/balancergroup/balancergroup.go

Lines changed: 77 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,9 @@ type BalancerGroup struct {
213213
outgoingMu sync.Mutex
214214
outgoingStarted bool
215215
idToBalancerConfig map[string]*subBalancerWrapper
216-
// Cache for sub-balancers when they are removed.
217-
balancerCache *cache.TimeoutCache
216+
// Cache for sub-balancers when they are removed. This is `nil` if caching
217+
// is disabled by passing `0` for Options.SubBalancerCloseTimeout`.
218+
deletedBalancerCache *cache.TimeoutCache
218219

219220
// incomingMu is to make sure this balancer group doesn't send updates to cc
220221
// after it's closed.
@@ -244,24 +245,40 @@ type BalancerGroup struct {
244245
scToSubBalancer map[balancer.SubConn]*subBalancerWrapper
245246
}
246247

247-
// DefaultSubBalancerCloseTimeout is defined as a variable instead of const for
248-
// testing.
249-
//
250-
// TODO: make it a parameter for New().
251-
var DefaultSubBalancerCloseTimeout = 15 * time.Minute
248+
// Options wraps the arguments to be passed to the BalancerGroup ctor.
249+
type Options struct {
250+
// CC is a reference to the parent balancer.ClientConn.
251+
CC balancer.ClientConn
252+
// BuildOpts contains build options to be used when creating sub-balancers.
253+
BuildOpts balancer.BuildOptions
254+
// StateAggregator is an implementation of the BalancerStateAggregator
255+
// interface to aggregate picker and connectivity states from sub-balancers.
256+
StateAggregator BalancerStateAggregator
257+
// Logger is a group specific prefix logger.
258+
Logger *grpclog.PrefixLogger
259+
// SubBalancerCloseTimeout is the amount of time deleted sub-balancers spend
260+
// in the idle cache. A value of zero here disables caching of deleted
261+
// sub-balancers.
262+
SubBalancerCloseTimeout time.Duration
263+
}
252264

253265
// New creates a new BalancerGroup. Note that the BalancerGroup
254266
// needs to be started to work.
255-
func New(cc balancer.ClientConn, bOpts balancer.BuildOptions, stateAggregator BalancerStateAggregator, logger *grpclog.PrefixLogger) *BalancerGroup {
256-
return &BalancerGroup{
257-
cc: cc,
258-
buildOpts: bOpts,
259-
logger: logger,
260-
stateAggregator: stateAggregator,
267+
func New(opts Options) *BalancerGroup {
268+
var bc *cache.TimeoutCache
269+
if opts.SubBalancerCloseTimeout != time.Duration(0) {
270+
bc = cache.NewTimeoutCache(opts.SubBalancerCloseTimeout)
271+
}
261272

262-
idToBalancerConfig: make(map[string]*subBalancerWrapper),
263-
balancerCache: cache.NewTimeoutCache(DefaultSubBalancerCloseTimeout),
264-
scToSubBalancer: make(map[balancer.SubConn]*subBalancerWrapper),
273+
return &BalancerGroup{
274+
cc: opts.CC,
275+
buildOpts: opts.BuildOpts,
276+
stateAggregator: opts.StateAggregator,
277+
logger: opts.Logger,
278+
279+
deletedBalancerCache: bc,
280+
idToBalancerConfig: make(map[string]*subBalancerWrapper),
281+
scToSubBalancer: make(map[balancer.SubConn]*subBalancerWrapper),
265282
}
266283
}
267284

@@ -307,9 +324,10 @@ func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.
307324
defer bg.outgoingMu.Unlock()
308325
var sbc *subBalancerWrapper
309326
// If outgoingStarted is true, search in the cache. Otherwise, cache is
310-
// guaranteed to be empty, searching is unnecessary.
311-
if bg.outgoingStarted {
312-
if old, ok := bg.balancerCache.Remove(id); ok {
327+
// guaranteed to be empty, searching is unnecessary. Also, skip the cache if
328+
// caching is disabled.
329+
if bg.outgoingStarted && bg.deletedBalancerCache != nil {
330+
if old, ok := bg.deletedBalancerCache.Remove(id); ok {
313331
sbc, _ = old.(*subBalancerWrapper)
314332
if sbc != nil && sbc.builder != builder {
315333
// If the sub-balancer in cache was built with a different
@@ -380,28 +398,47 @@ func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) {
380398
// subconns) will be done after timeout.
381399
func (bg *BalancerGroup) Remove(id string) {
382400
bg.logger.Infof("Removing child policy for locality %q", id)
401+
383402
bg.outgoingMu.Lock()
384-
if sbToRemove, ok := bg.idToBalancerConfig[id]; ok {
385-
if bg.outgoingStarted {
386-
bg.balancerCache.Add(id, sbToRemove, func() {
387-
// A sub-balancer evicted from the timeout cache needs to closed
388-
// and its subConns need to removed, unconditionally. There is a
389-
// possibility that a sub-balancer might be removed (thereby
390-
// moving it to the cache) around the same time that the
391-
// balancergroup is closed, and by the time we get here the
392-
// balancergroup might be closed. Check for `outgoingStarted ==
393-
// true` at that point can lead to a leaked sub-balancer.
394-
bg.outgoingMu.Lock()
395-
sbToRemove.stopBalancer()
396-
bg.outgoingMu.Unlock()
397-
bg.cleanupSubConns(sbToRemove)
398-
})
399-
}
400-
delete(bg.idToBalancerConfig, id)
401-
} else {
403+
404+
sbToRemove, ok := bg.idToBalancerConfig[id]
405+
if !ok {
402406
bg.logger.Infof("balancer group: trying to remove a non-existing locality from balancer group: %v", id)
407+
bg.outgoingMu.Unlock()
408+
return
409+
}
410+
411+
// Unconditionally remove the sub-balancer config from the map.
412+
delete(bg.idToBalancerConfig, id)
413+
if !bg.outgoingStarted {
414+
// Nothing needs to be done here, since we wouldn't have created the
415+
// sub-balancer.
416+
bg.outgoingMu.Unlock()
417+
return
403418
}
419+
420+
if bg.deletedBalancerCache != nil {
421+
bg.deletedBalancerCache.Add(id, sbToRemove, func() {
422+
// A sub-balancer evicted from the timeout cache needs to closed
423+
// and its subConns need to removed, unconditionally. There is a
424+
// possibility that a sub-balancer might be removed (thereby
425+
// moving it to the cache) around the same time that the
426+
// balancergroup is closed, and by the time we get here the
427+
// balancergroup might be closed. Check for `outgoingStarted ==
428+
// true` at that point can lead to a leaked sub-balancer.
429+
bg.outgoingMu.Lock()
430+
sbToRemove.stopBalancer()
431+
bg.outgoingMu.Unlock()
432+
bg.cleanupSubConns(sbToRemove)
433+
})
434+
bg.outgoingMu.Unlock()
435+
return
436+
}
437+
438+
// Remove the sub-balancer with immediate effect if we are not caching.
439+
sbToRemove.stopBalancer()
404440
bg.outgoingMu.Unlock()
441+
bg.cleanupSubConns(sbToRemove)
405442
}
406443

407444
// bg.remove(id) doesn't do cleanup for the sub-balancer. This function does
@@ -546,7 +583,9 @@ func (bg *BalancerGroup) Close() {
546583

547584
// Clear(true) runs clear function to close sub-balancers in cache. It
548585
// must be called out of outgoing mutex.
549-
bg.balancerCache.Clear(true)
586+
if bg.deletedBalancerCache != nil {
587+
bg.deletedBalancerCache.Clear(true)
588+
}
550589

551590
bg.outgoingMu.Lock()
552591
if bg.outgoingStarted {

0 commit comments

Comments
 (0)