Skip to content

Commit

Permalink
balancer/base: consider an empty address list an error
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Feb 4, 2020
1 parent 3ae60eb commit e6e7130
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 27 deletions.
16 changes: 14 additions & 2 deletions balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"errors"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
)

type baseBuilder struct {
Expand Down Expand Up @@ -85,10 +87,16 @@ func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
func (b *baseBalancer) ResolverError(err error) {
switch b.state {
case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
// Set a failing picker if we don't have a good picker.
if b.picker != nil {
b.picker = NewErrPicker(err)
b.picker = NewErrPicker(status.Errorf(codes.Unavailable, "name resolver error: %v", err))
b.cc.UpdateBalancerState(connectivity.TransientFailure, b.picker)
} else {
b.v2Picker = NewErrPickerV2(err)
b.v2Picker = NewErrPickerV2(status.Errorf(codes.Unavailable, "name resolver error: %v", err))
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: b.v2Picker,
})
}
}
}
Expand All @@ -99,6 +107,10 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if grpclog.V(2) {
grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
}
if len(s.ResolverState.Addresses) == 0 {
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range s.ResolverState.Addresses {
Expand Down
32 changes: 25 additions & 7 deletions balancer/roundrobin/roundrobin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"context"
"fmt"
"net"
"strings"
"sync"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/peer"
Expand Down Expand Up @@ -210,15 +212,31 @@ func TestAddressesRemoved(t *testing.T) {
}

r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
for i := 0; i < 1000; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
return
// Removing addresses results in an error reported to the clientconn, but
// the existing connections remain. RPCs should still succeed.
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}

// Stop the server to bring the channel state into transient failure.
test.cleanup()
// Wait for not-ready.
for src := cc.GetState(); src == connectivity.Ready; src = cc.GetState() {
if !cc.WaitForStateChange(ctx, src) {
t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready)
}
time.Sleep(time.Millisecond)
}
t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded")
// Report an empty server list again; because the state is not ready, the
// empty address list error should surface to the user.
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})

const msgWant = "produced zero addresses"
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || !strings.Contains(status.Convert(err).Message(), msgWant) {
t.Fatalf("EmptyCall() = _, %v, want _, Contains(Message(), %q)", err, msgWant)
}

}

func TestCloseWithPendingRPC(t *testing.T) {
Expand Down
31 changes: 31 additions & 0 deletions balancer_conn_wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package grpc

import (
"fmt"
"net"
"testing"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
Expand Down Expand Up @@ -88,3 +90,32 @@ func (s) TestBalancerErrorResolverPolling(t *testing.T) {
},
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, balName)))
}

// TestRoundRobinZeroAddressesResolverPolling reports no addresses to the round
// robin balancer and verifies ResolveNow is called on the resolver with the
// appropriate backoff strategy being consulted between ResolveNow calls.
func (s) TestRoundRobinZeroAddressesResolverPolling(t *testing.T) {
// We need to start a real server or else the connecting loop will call
// ResolveNow after every iteration, even after a valid resolver result is
// returned.
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis.Close()
s := NewServer()
defer s.Stop()
go s.Serve(lis)

testResolverErrorPolling(t,
func(r *manual.Resolver) {
// No addresses so the balancer will fail.
r.CC.UpdateState(resolver.State{})
}, func(r *manual.Resolver) {
// UpdateState will block if ResolveNow is being called (which
// blocks on rn), so call it in a goroutine. Include a valid
// address so the balancer will be happy.
go r.CC.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
},
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name)))
}
47 changes: 31 additions & 16 deletions test/channelz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,15 +1515,29 @@ func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) {
t.Fatal(err)
}

r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
// Wait for ready
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() {
if !te.cc.WaitForStateChange(ctx, src) {
t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.Ready)
}
}
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
// Wait for not-ready.
for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() {
if !te.cc.WaitForStateChange(ctx, src) {
t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready)
}
}

if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
if len(tcs[0].SubChans) != 0 {
return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans))
if len(tcs[0].SubChans) != 1 {
return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
}
scm := channelz.GetSubChannel(subConn)
if scm == nil {
Expand Down Expand Up @@ -1770,7 +1784,7 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) {
return false, fmt.Errorf("transient failure has not happened on SubChannel yet")
}
transient = 0
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
for _, e := range scm.Trace.Events {
if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) {
ready++
Expand Down Expand Up @@ -1866,7 +1880,7 @@ func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) {
czCleanup := channelz.NewChannelzStorage()
defer czCleanupWrapper(czCleanup, t)
e := tcpClearRREnv
// avoid calling API to set balancer type, which will void service config's change of balancer.
// avoid newTest using WithBalancer, which would override service config's change of balancer below.
e.balancer = ""
te := newTest(t, e)
channelz.SetMaxTraceEntry(1)
Expand Down Expand Up @@ -1956,19 +1970,20 @@ func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
t.Fatal(err)
}

r.UpdateState(resolver.State{Addresses: []resolver.Address{}})

if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
// Wait for ready
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() {
if !te.cc.WaitForStateChange(ctx, src) {
t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.Ready)
}
if len(tcs[0].SubChans) != 0 {
return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans))
}
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
// Wait for not-ready.
for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() {
if !te.cc.WaitForStateChange(ctx, src) {
t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready)
}
return true, nil
}); err != nil {
t.Fatal(err)
}

// verify that the subchannel no longer exist due to trace referencing it got overwritten.
Expand Down
4 changes: 2 additions & 2 deletions test/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
default:
}
// trigger teardown of the ac
r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: sc})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}, ServiceConfig: sc})

select {
case <-hcExitChan:
Expand Down Expand Up @@ -653,7 +653,7 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *tes
t.Fatal("Health check function has not been invoked after 5s.")
}
// trigger teardown of the ac, ac in SHUTDOWN state
r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: sc})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}, ServiceConfig: sc})

// The health check func should exit without calling the setConnectivityState func, as server hasn't sent
// any response.
Expand Down

0 comments on commit e6e7130

Please sign in to comment.