Skip to content

Commit bff5391

Browse files
committed
base: update base balancer for new APIs
1 parent c88a003 commit bff5391

File tree

2 files changed

+39
-11
lines changed

2 files changed

+39
-11
lines changed

balancer/base/balancer.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,12 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
105105
addrsSet.Set(a, nil)
106106
if _, ok := b.subConns.Get(a); !ok {
107107
// a is a new address (not existing in b.subConns).
108-
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
108+
var sc balancer.SubConn
109+
opts := balancer.NewSubConnOptions{
110+
HealthCheckEnabled: b.config.HealthCheck,
111+
StateListener: func(scs balancer.SubConnState) { b.updateSubConnState(sc, scs) },
112+
}
113+
sc, err := b.cc.NewSubConn([]resolver.Address{a}, opts)
109114
if err != nil {
110115
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
111116
continue
@@ -121,10 +126,10 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
121126
sc := sci.(balancer.SubConn)
122127
// a was removed by resolver.
123128
if _, ok := addrsSet.Get(a); !ok {
124-
b.cc.RemoveSubConn(sc)
129+
sc.Shutdown()
125130
b.subConns.Delete(a)
126131
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
127-
// The entry will be deleted in UpdateSubConnState.
132+
// The entry will be deleted in updateSubConnState.
128133
}
129134
}
130135
// If resolver state contains no addresses, return an error so ClientConn
@@ -177,7 +182,12 @@ func (b *baseBalancer) regeneratePicker() {
177182
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
178183
}
179184

185+
// UpdateSubConnState is a nop because a StateListener is always set in NewSubConn.
180186
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
187+
logger.Errorf("base.baseBalancer: UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
188+
}
189+
190+
func (b *baseBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
181191
s := state.ConnectivityState
182192
if logger.V(2) {
183193
logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
@@ -204,8 +214,8 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
204214
case connectivity.Idle:
205215
sc.Connect()
206216
case connectivity.Shutdown:
207-
// When an address was removed by resolver, b called RemoveSubConn but
208-
// kept the sc's state in scStates. Remove state for this sc here.
217+
// When an address was removed by resolver, b called Shutdown but kept
218+
// the sc's state in scStates. Remove state for this sc here.
209219
delete(b.scStates, sc)
210220
case connectivity.TransientFailure:
211221
// Save error to be reported via picker.
@@ -226,7 +236,7 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
226236
}
227237

228238
// Close is a nop because base balancer doesn't have internal state to clean up,
229-
// and it doesn't need to call RemoveSubConn for the SubConns.
239+
// and it doesn't need to call Shutdown for the SubConns.
230240
func (b *baseBalancer) Close() {
231241
}
232242

balancer/base/balancer_test.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package base
2020

2121
import (
22+
"context"
2223
"testing"
24+
"time"
2325

2426
"google.golang.org/grpc/attributes"
2527
"google.golang.org/grpc/balancer"
@@ -38,7 +40,9 @@ func (c *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewS
3840

3941
func (c *testClientConn) UpdateState(balancer.State) {}
4042

41-
type testSubConn struct{}
43+
type testSubConn struct {
44+
updateState func(balancer.SubConnState)
45+
}
4246

4347
func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}
4448

@@ -61,7 +65,11 @@ func (p *testPickBuilder) Build(info PickerBuildInfo) balancer.Picker {
6165
}
6266

6367
func TestBaseBalancerReserveAttributes(t *testing.T) {
64-
var v = func(info PickerBuildInfo) {
68+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
69+
defer cancel()
70+
validated := make(chan struct{}, 1)
71+
v := func(info PickerBuildInfo) {
72+
defer func() { validated <- struct{}{} }()
6573
for _, sc := range info.ReadySCs {
6674
if sc.Address.Addr == "1.1.1.1" {
6775
if sc.Address.Attributes == nil {
@@ -80,8 +88,8 @@ func TestBaseBalancerReserveAttributes(t *testing.T) {
8088
}
8189
pickBuilder := &testPickBuilder{validate: v}
8290
b := (&baseBuilder{pickerBuilder: pickBuilder}).Build(&testClientConn{
83-
newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) {
84-
return &testSubConn{}, nil
91+
newSubConn: func(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
92+
return &testSubConn{updateState: opts.StateListener}, nil
8593
},
8694
}, balancer.BuildOptions{}).(*baseBalancer)
8795

@@ -93,8 +101,18 @@ func TestBaseBalancerReserveAttributes(t *testing.T) {
93101
},
94102
},
95103
})
104+
select {
105+
case <-validated:
106+
case <-ctx.Done():
107+
t.Fatalf("timed out waiting for UpdateClientConnState to call picker.Build")
108+
}
96109

97110
for sc := range b.scStates {
98-
b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil})
111+
sc.(*testSubConn).updateState(balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil})
112+
select {
113+
case <-validated:
114+
case <-ctx.Done():
115+
t.Fatalf("timed out waiting for UpdateClientConnState to call picker.Build")
116+
}
99117
}
100118
}

0 commit comments

Comments
 (0)