Skip to content

xds/ringhash: use StateListener instead of UpdateSubConnState #6522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions xds/internal/balancer/ringhash/ringhash.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,12 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
addrsSet.Set(addr, true)
newWeight := getWeightAttribute(addr)
if val, ok := b.subConns.Get(addr); !ok {
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
var sc balancer.SubConn
opts := balancer.NewSubConnOptions{
HealthCheckEnabled: true,
StateListener: func(state balancer.SubConnState) { b.updateSubConnState(sc, state) },
}
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, opts)
if err != nil {
b.logger.Warningf("Failed to create new SubConn: %v", err)
continue
Expand Down Expand Up @@ -256,7 +261,7 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
b.subConns.Delete(addr)
addrsUpdated = true
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
// The entry will be deleted in updateSubConnState.
}
}
return addrsUpdated
Expand Down Expand Up @@ -321,7 +326,11 @@ func (b *ringhashBalancer) ResolverError(err error) {
})
}

// UpdateSubConnState updates the per-SubConn state stored in the ring, and also
func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

// updateSubConnState updates the per-SubConn state stored in the ring, and also
// the aggregated state.
//
// It triggers an update to cc when:
Expand All @@ -332,7 +341,7 @@ func (b *ringhashBalancer) ResolverError(err error) {
// - the aggregated state is changed
// - the same picker will be sent again, but this update may trigger a re-pick
// for some RPCs.
func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
func (b *ringhashBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
if logger.V(2) {
b.logger.Infof("Handle SubConn state change: %p, %v", sc, s)
Expand Down
96 changes: 50 additions & 46 deletions xds/internal/balancer/ringhash/ringhash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s) TestUpdateClientConnState_NewRingSize(t *testing.T) {

func (s) TestOneSubConn(t *testing.T) {
wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0]}
cc, b, p0 := setupTest(t, []resolver.Address{wantAddr1})
cc, _, p0 := setupTest(t, []resolver.Address{wantAddr1})
ring0 := p0.(*picker).ring

firstHash := ring0.items[0].hash
Expand All @@ -156,16 +156,16 @@ func (s) TestOneSubConn(t *testing.T) {
if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
}
sc0 := ring0.items[0].sc.sc
sc0 := ring0.items[0].sc.sc.(*testutils.TestSubConn)
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-sc0.ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
}

// Send state updates to Ready.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})

// Test pick with one backend.
p1 := <-cc.NewPickerCh
Expand All @@ -186,7 +186,7 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) {
{Addr: testBackendAddrStrs[1]},
{Addr: testBackendAddrStrs[2]},
}
cc, b, p0 := setupTest(t, wantAddrs)
cc, _, p0 := setupTest(t, wantAddrs)
// This test doesn't update addresses, so this ring will be used by all the
// pickers.
ring0 := p0.(*picker).ring
Expand All @@ -200,16 +200,16 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) {
t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
}
// The picked SubConn should be the second in the ring.
sc0 := ring0.items[1].sc.sc
sc0 := ring0.items[1].sc.sc.(*testutils.TestSubConn)
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-sc0.ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
}

// Send state updates to Ready.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
Expand All @@ -219,7 +219,7 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) {
}

// Turn down the subConn in use.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
p2 := <-cc.NewPickerCh
// Pick with the same hash should be queued, because the SubConn after the
// first picked is Idle.
Expand All @@ -228,16 +228,16 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) {
}

// The third SubConn in the ring should connect.
sc1 := ring0.items[2].sc.sc
sc1 := ring0.items[2].sc.sc.(*testutils.TestSubConn)
select {
case <-sc1.(*testutils.TestSubConn).ConnectCh:
case <-sc1.ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc1)
}

// Send state updates to Ready.
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// New picks should all return this SubConn.
p3 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
Expand All @@ -248,19 +248,19 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) {
}

// Now, after backoff, the first picked SubConn will turn Idle.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
// The picks above should have queued Connect() for the first picked
// SubConn, so this Idle state change will trigger a Connect().
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-sc0.ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
}

// After the first picked SubConn turn Ready, new picks should return it
// again (even though the second picked SubConn is also Ready).
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
p4 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p4.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
Expand All @@ -279,7 +279,7 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) {
{Addr: testBackendAddrStrs[1]},
{Addr: testBackendAddrStrs[2]},
}
cc, b, p0 := setupTest(t, wantAddrs)
cc, _, p0 := setupTest(t, wantAddrs)
// This test doesn't update addresses, so this ring will be used by all the
// pickers.
ring0 := p0.(*picker).ring
Expand All @@ -292,16 +292,16 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) {
if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
}
sc0 := ring0.items[1].sc.sc
sc0 := ring0.items[1].sc.sc.(*testutils.TestSubConn)
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-sc0.ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
}

// Send state updates to Ready.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})

// First hash should always pick sc0.
p1 := <-cc.NewPickerCh
Expand All @@ -318,14 +318,14 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) {
if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash2)}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
}
sc1 := ring0.items[2].sc.sc
sc1 := ring0.items[2].sc.sc.(*testutils.TestSubConn)
select {
case <-sc1.(*testutils.TestSubConn).ConnectCh:
case <-sc1.ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc1)
}
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})

// With the new generated picker, hash2 always picks sc1.
p2 := <-cc.NewPickerCh
Expand Down Expand Up @@ -419,58 +419,62 @@ func (s) TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) {
{Addr: testBackendAddrStrs[1]},
{Addr: testBackendAddrStrs[2]},
}
_, b, p0 := setupTest(t, wantAddrs)
_, _, p0 := setupTest(t, wantAddrs)
ring0 := p0.(*picker).ring

// ringhash won't tell SCs to connect until there is an RPC, so simulate
// one now.
p0.Pick(balancer.PickInfo{Ctx: context.Background()})

// Turn the first subconn to transient failure.
sc0 := ring0.items[0].sc.sc
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle})
sc0 := ring0.items[0].sc.sc.(*testutils.TestSubConn)
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})

// It will trigger the second subconn to connect (because overall state is
// Connect (when one subconn is TF)).
sc1 := ring0.items[1].sc.sc
sc1 := ring0.items[1].sc.sc.(*testutils.TestSubConn)
select {
case <-sc1.(*testutils.TestSubConn).ConnectCh:
case <-sc1.ConnectCh:
case <-time.After(defaultTestShortTimeout):
t.Fatalf("timeout waiting for Connect() from SubConn %v", sc1)
}

// Turn the second subconn to TF. This will set the overall state to TF.
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Idle})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})

// It will trigger the third subconn to connect.
sc2 := ring0.items[2].sc.sc
sc2 := ring0.items[2].sc.sc.(*testutils.TestSubConn)
select {
case <-sc2.(*testutils.TestSubConn).ConnectCh:
case <-sc2.ConnectCh:
case <-time.After(defaultTestShortTimeout):
t.Fatalf("timeout waiting for Connect() from SubConn %v", sc2)
}

// Turn the third subconn to TF. This will set the overall state to TF.
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Idle})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})

// It will trigger the first subconn to connect.
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-sc0.ConnectCh:
case <-time.After(defaultTestShortTimeout):
t.Fatalf("timeout waiting for Connect() from SubConn %v", sc0)
}

// Turn the third subconn to TF again.
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Idle})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})

// This will not trigger any new Connect() on the SubConns, because sc0 is
// still attempting to connect, and we only need one SubConn to connect.
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-sc0.ConnectCh:
t.Fatalf("unexpected Connect() from SubConn %v", sc0)
case <-sc1.(*testutils.TestSubConn).ConnectCh:
case <-sc1.ConnectCh:
t.Fatalf("unexpected Connect() from SubConn %v", sc1)
case <-sc2.(*testutils.TestSubConn).ConnectCh:
case <-sc2.ConnectCh:
t.Fatalf("unexpected Connect() from SubConn %v", sc2)
case <-time.After(defaultTestShortTimeout):
}
Expand Down