Skip to content

Commit 7aceafc

Browse files
authored
balancer: add SubConn.Shutdown; deprecate Balancer.RemoveSubConn (#6493)
1 parent 4fe8d3d commit 7aceafc

File tree

13 files changed

+155
-54
lines changed

13 files changed

+155
-54
lines changed

balancer/balancer.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,13 @@ type SubConn interface {
115115
// creates a new one and returns it. Returns a close function which must
116116
// be called when the Producer is no longer needed.
117117
GetOrBuildProducer(ProducerBuilder) (p Producer, close func())
118+
// Shutdown shuts down the SubConn gracefully. Any started RPCs will be
119+
// allowed to complete. No future calls should be made on the SubConn.
120+
// One final state update will be delivered to the StateListener (or
121+
// UpdateSubConnState; deprecated) with ConnectivityState of Shutdown to
122+
// indicate the shutdown operation. This may be delivered before
123+
// in-progress RPCs are complete and the actual connection is closed.
124+
Shutdown()
118125
}
119126

120127
// NewSubConnOptions contains options to create new SubConn.
@@ -161,6 +168,8 @@ type ClientConn interface {
161168
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
162169
// RemoveSubConn removes the SubConn from ClientConn.
163170
// The SubConn will be shutdown.
171+
//
172+
// Deprecated: use SubConn.Shutdown instead.
164173
RemoveSubConn(SubConn)
165174
// UpdateAddresses updates the addresses used in the passed in SubConn.
166175
// gRPC checks if the currently connected address is still in the new list.

balancer/base/balancer_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}
4444

4545
func (sc *testSubConn) Connect() {}
4646

47+
func (sc *testSubConn) Shutdown() {}
48+
4749
func (sc *testSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
4850
return nil, nil
4951
}

balancer/grpclb/grpclb.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,13 @@ func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
319319
return connectivity.TransientFailure
320320
}
321321

322+
// UpdateSubConnState is unused; NewSubConn's options always specifies
323+
// updateSubConnState as the listener.
322324
func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
325+
logger.Errorf("grpclb: UpdateSubConnState(%v, %+v) called unexpectedly", sc, scs)
326+
}
327+
328+
func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
323329
s := scs.ConnectivityState
324330
if logger.V(2) {
325331
logger.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
@@ -373,8 +379,13 @@ func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop
373379
if forceRegeneratePicker || (lb.state != oldAggrState) {
374380
lb.regeneratePicker(resetDrop)
375381
}
382+
var cc balancer.ClientConn = lb.cc
383+
if lb.usePickFirst {
384+
// Bypass the caching layer that would wrap the picker.
385+
cc = lb.cc.ClientConn
386+
}
376387

377-
lb.cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
388+
cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
378389
}
379390

380391
// fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use

balancer/grpclb/grpclb_remote_balancer.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
126126
if oldUsePickFirst {
127127
// If old SubConn were created for pickfirst, bypass cache and
128128
// remove directly.
129-
lb.cc.cc.RemoveSubConn(sc)
129+
lb.cc.ClientConn.RemoveSubConn(sc)
130130
} else {
131131
lb.cc.RemoveSubConn(sc)
132132
}
@@ -144,16 +144,17 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
144144
}
145145
if sc != nil {
146146
if len(backendAddrs) == 0 {
147-
lb.cc.cc.RemoveSubConn(sc)
147+
lb.cc.ClientConn.RemoveSubConn(sc)
148148
delete(lb.subConns, scKey)
149149
return
150150
}
151-
lb.cc.cc.UpdateAddresses(sc, backendAddrs)
151+
lb.cc.ClientConn.UpdateAddresses(sc, backendAddrs)
152152
sc.Connect()
153153
return
154154
}
155+
opts.StateListener = func(scs balancer.SubConnState) { lb.updateSubConnState(sc, scs) }
155156
// This bypasses the cc wrapper with SubConn cache.
156-
sc, err := lb.cc.cc.NewSubConn(backendAddrs, opts)
157+
sc, err := lb.cc.ClientConn.NewSubConn(backendAddrs, opts)
157158
if err != nil {
158159
logger.Warningf("grpclb: failed to create new SubConn: %v", err)
159160
return
@@ -176,6 +177,8 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
176177

177178
if _, ok := lb.subConns[addrWithoutAttrs]; !ok {
178179
// Use addrWithMD to create the SubConn.
180+
var sc balancer.SubConn
181+
opts.StateListener = func(scs balancer.SubConnState) { lb.updateSubConnState(sc, scs) }
179182
sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, opts)
180183
if err != nil {
181184
logger.Warningf("grpclb: failed to create new SubConn: %v", err)
@@ -419,7 +422,7 @@ func (ccw *remoteBalancerCCWrapper) watchRemoteBalancer() {
419422
}
420423
}
421424
// Trigger a re-resolve when the stream errors.
422-
ccw.lb.cc.cc.ResolveNow(resolver.ResolveNowOptions{})
425+
ccw.lb.cc.ClientConn.ResolveNow(resolver.ResolveNowOptions{})
423426

424427
ccw.lb.mu.Lock()
425428
ccw.lb.remoteBalancerConnected = false

balancer/grpclb/grpclb_util.go

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,10 @@ const subConnCacheTime = time.Second * 10
9393
// lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache.
9494
// SubConns will be kept in cache for subConnCacheTime before being removed.
9595
//
96-
// Its new and remove methods are updated to do cache first.
96+
// Its NewSubconn and SubConn.Shutdown methods are updated to do cache first.
9797
type lbCacheClientConn struct {
98-
cc balancer.ClientConn
98+
balancer.ClientConn
99+
99100
timeout time.Duration
100101

101102
mu sync.Mutex
@@ -113,7 +114,7 @@ type subConnCacheEntry struct {
113114

114115
func newLBCacheClientConn(cc balancer.ClientConn) *lbCacheClientConn {
115116
return &lbCacheClientConn{
116-
cc: cc,
117+
ClientConn: cc,
117118
timeout: subConnCacheTime,
118119
subConnCache: make(map[resolver.Address]*subConnCacheEntry),
119120
subConnToAddr: make(map[balancer.SubConn]resolver.Address),
@@ -137,16 +138,27 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer
137138
return entry.sc, nil
138139
}
139140

140-
scNew, err := ccc.cc.NewSubConn(addrs, opts)
141+
scNew, err := ccc.ClientConn.NewSubConn(addrs, opts)
141142
if err != nil {
142143
return nil, err
143144
}
145+
scNew = &lbCacheSubConn{SubConn: scNew, ccc: ccc}
144146

145147
ccc.subConnToAddr[scNew] = addrWithoutAttrs
146148
return scNew, nil
147149
}
148150

149151
func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
152+
sc.Shutdown()
153+
}
154+
155+
type lbCacheSubConn struct {
156+
balancer.SubConn
157+
ccc *lbCacheClientConn
158+
}
159+
160+
func (sc *lbCacheSubConn) Shutdown() {
161+
ccc := sc.ccc
150162
ccc.mu.Lock()
151163
defer ccc.mu.Unlock()
152164
addr, ok := ccc.subConnToAddr[sc]
@@ -160,7 +172,7 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
160172
// same address, and those SubConns are all removed. We remove sc
161173
// immediately here.
162174
delete(ccc.subConnToAddr, sc)
163-
ccc.cc.RemoveSubConn(sc)
175+
sc.SubConn.Shutdown()
164176
}
165177
return
166178
}
@@ -176,7 +188,7 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
176188
if entry.abortDeleting {
177189
return
178190
}
179-
ccc.cc.RemoveSubConn(sc)
191+
sc.SubConn.Shutdown()
180192
delete(ccc.subConnToAddr, sc)
181193
delete(ccc.subConnCache, addr)
182194
})
@@ -195,14 +207,28 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
195207
}
196208

197209
func (ccc *lbCacheClientConn) UpdateState(s balancer.State) {
198-
ccc.cc.UpdateState(s)
210+
s.Picker = &lbCachePicker{Picker: s.Picker}
211+
ccc.ClientConn.UpdateState(s)
199212
}
200213

201214
func (ccc *lbCacheClientConn) close() {
202215
ccc.mu.Lock()
216+
defer ccc.mu.Unlock()
203217
// Only cancel all existing timers. There's no need to remove SubConns.
204218
for _, entry := range ccc.subConnCache {
205219
entry.cancel()
206220
}
207-
ccc.mu.Unlock()
221+
}
222+
223+
type lbCachePicker struct {
224+
balancer.Picker
225+
}
226+
227+
func (cp *lbCachePicker) Pick(i balancer.PickInfo) (balancer.PickResult, error) {
228+
res, err := cp.Picker.Pick(i)
229+
if err != nil {
230+
return res, err
231+
}
232+
res.SubConn = res.SubConn.(*lbCacheSubConn).SubConn
233+
return res, nil
208234
}

balancer/grpclb/grpclb_util_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ import (
3030

3131
type mockSubConn struct {
3232
balancer.SubConn
33+
mcc *mockClientConn
34+
}
35+
36+
func (msc *mockSubConn) Shutdown() {
37+
msc.mcc.mu.Lock()
38+
defer msc.mcc.mu.Unlock()
39+
delete(msc.mcc.subConns, msc)
3340
}
3441

3542
type mockClientConn struct {
@@ -46,17 +53,15 @@ func newMockClientConn() *mockClientConn {
4653
}
4754

4855
func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
49-
sc := &mockSubConn{}
56+
sc := &mockSubConn{mcc: mcc}
5057
mcc.mu.Lock()
5158
defer mcc.mu.Unlock()
5259
mcc.subConns[sc] = addrs[0]
5360
return sc, nil
5461
}
5562

5663
func (mcc *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
57-
mcc.mu.Lock()
58-
defer mcc.mu.Unlock()
59-
delete(mcc.subConns, sc)
64+
sc.Shutdown()
6065
}
6166

6267
const testCacheTimeout = 100 * time.Millisecond

balancer_conn_wrappers.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
301301
return nil, err
302302
}
303303
acbw := &acBalancerWrapper{
304+
ccb: ccb,
304305
ac: ac,
305306
producers: make(map[balancer.ProducerBuilder]*refCountedProducer),
306307
stateListener: opts.StateListener,
@@ -370,7 +371,8 @@ func (ccb *ccBalancerWrapper) Target() string {
370371
// acBalancerWrapper is a wrapper on top of ac for balancers.
371372
// It implements balancer.SubConn interface.
372373
type acBalancerWrapper struct {
373-
ac *addrConn // read-only
374+
ac *addrConn // read-only
375+
ccb *ccBalancerWrapper // read-only
374376
stateListener func(balancer.SubConnState)
375377

376378
mu sync.Mutex
@@ -389,6 +391,10 @@ func (acbw *acBalancerWrapper) Connect() {
389391
go acbw.ac.connect()
390392
}
391393

394+
func (acbw *acBalancerWrapper) Shutdown() {
395+
acbw.ccb.RemoveSubConn(acbw)
396+
}
397+
392398
// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
393399
// ready, blocks until it is or ctx expires. Returns an error when the context
394400
// expires or the addrConn is shut down.

internal/balancer/gracefulswitch/gracefulswitch.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -364,13 +364,7 @@ func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
364364
}
365365

366366
func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
367-
bw.gsb.mu.Lock()
368-
if !bw.gsb.balancerCurrentOrPending(bw) {
369-
bw.gsb.mu.Unlock()
370-
return
371-
}
372-
bw.gsb.mu.Unlock()
373-
bw.gsb.cc.RemoveSubConn(sc)
367+
sc.Shutdown()
374368
}
375369

376370
func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {

internal/testutils/balancer.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type testingLogger interface {
3838

3939
// TestSubConn implements the SubConn interface, to be used in tests.
4040
type TestSubConn struct {
41+
tcc *TestClientConn // the CC that owns this SubConn
4142
id string
4243
ConnectCh chan struct{}
4344
stateListener func(balancer.SubConnState)
@@ -81,6 +82,16 @@ func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) {
8182
}
8283
}
8384

85+
// Shutdown pushes the SubConn to the RemoveSubConn channel in the parent
86+
// TestClientConn.
87+
func (tsc *TestSubConn) Shutdown() {
88+
tsc.tcc.logger.Logf("SubConn %s: Shutdown", tsc)
89+
select {
90+
case tsc.tcc.RemoveSubConnCh <- tsc:
91+
default:
92+
}
93+
}
94+
8495
// String implements stringer to print human friendly error message.
8596
func (tsc *TestSubConn) String() string {
8697
return tsc.id
@@ -121,6 +132,7 @@ func NewTestClientConn(t *testing.T) *TestClientConn {
121132
// NewSubConn creates a new SubConn.
122133
func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) {
123134
sc := &TestSubConn{
135+
tcc: tcc,
124136
id: fmt.Sprintf("sc%d", tcc.subConnIdx),
125137
ConnectCh: make(chan struct{}, 1),
126138
stateListener: o.StateListener,
@@ -143,11 +155,7 @@ func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubCon
143155

144156
// RemoveSubConn removes the SubConn.
145157
func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) {
146-
tcc.logger.Logf("testClientConn: RemoveSubConn(%s)", sc)
147-
select {
148-
case tcc.RemoveSubConnCh <- sc.(*TestSubConn):
149-
default:
150-
}
158+
sc.(*TestSubConn).Shutdown()
151159
}
152160

153161
// UpdateAddresses updates the addresses on the SubConn.

test/balancer_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,3 +1061,59 @@ func (s) TestBalancerProducerHonorsContext(t *testing.T) {
10611061
t.Fatalf("RPC error: %v; want status.Code(err)=%v", err, codes.Canceled)
10621062
}
10631063
}
1064+
1065+
// TestSubConnShutdown confirms that the Shutdown method on subconns properly
1066+
// initiates their shutdown.
1067+
func (s) TestSubConnShutdown(t *testing.T) {
1068+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1069+
defer cancel()
1070+
1071+
gotShutdown := grpcsync.NewEvent()
1072+
1073+
bf := stub.BalancerFuncs{
1074+
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
1075+
var sc balancer.SubConn
1076+
opts := balancer.NewSubConnOptions{
1077+
StateListener: func(scs balancer.SubConnState) {
1078+
switch scs.ConnectivityState {
1079+
case connectivity.Connecting:
1080+
// Ignored.
1081+
case connectivity.Ready:
1082+
sc.Shutdown()
1083+
case connectivity.Shutdown:
1084+
gotShutdown.Fire()
1085+
default:
1086+
t.Errorf("got unexpected state %q in listener", scs.ConnectivityState)
1087+
}
1088+
},
1089+
}
1090+
sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, opts)
1091+
if err != nil {
1092+
return err
1093+
}
1094+
sc.Connect()
1095+
// Report the state as READY to unblock ss.Start(), which waits for ready.
1096+
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready})
1097+
return nil
1098+
},
1099+
}
1100+
1101+
const testBalName = "shutdown-test-balancer"
1102+
stub.Register(testBalName, bf)
1103+
t.Logf("Registered balancer %s...", testBalName)
1104+
1105+
ss := &stubserver.StubServer{}
1106+
if err := ss.Start(nil, grpc.WithDefaultServiceConfig(
1107+
fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, testBalName),
1108+
)); err != nil {
1109+
t.Fatalf("Error starting endpoint server: %v", err)
1110+
}
1111+
defer ss.Stop()
1112+
1113+
select {
1114+
case <-gotShutdown.Done():
1115+
// Success
1116+
case <-ctx.Done():
1117+
t.Fatalf("Timed out waiting for gotShutdown to be fired.")
1118+
}
1119+
}

0 commit comments

Comments
 (0)