From cb03b9f65c14e57077a8c8508c4431ebc6521b1a Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 5 Feb 2020 13:16:31 -0800 Subject: [PATCH] balancer/base: consider an empty address list an error (#3361) --- balancer/base/balancer.go | 67 ++++++++++++++++++-------- balancer/roundrobin/roundrobin_test.go | 32 +++++++++--- balancer_conn_wrappers_test.go | 31 ++++++++++++ test/channelz_test.go | 47 ++++++++++++------ test/healthcheck_test.go | 4 +- 5 files changed, 137 insertions(+), 44 deletions(-) diff --git a/balancer/base/balancer.go b/balancer/base/balancer.go index d952f09f345a..d7d72918ad69 100644 --- a/balancer/base/balancer.go +++ b/balancer/base/balancer.go @@ -21,6 +21,7 @@ package base import ( "context" "errors" + "fmt" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" @@ -76,6 +77,9 @@ type baseBalancer struct { picker balancer.Picker v2Picker balancer.V2Picker config Config + + resolverErr error // the last error reported by the resolver; cleared on successful resolution + connErr error // the last connection error; cleared upon leaving TransientFailure } func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { @@ -83,13 +87,23 @@ func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) } func (b *baseBalancer) ResolverError(err error) { - switch b.state { - case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting: - if b.picker != nil { - b.picker = NewErrPicker(err) - } else { - b.v2Picker = NewErrPickerV2(err) - } + b.resolverErr = err + if len(b.subConns) == 0 { + b.state = connectivity.TransientFailure + } + if b.state != connectivity.TransientFailure { + // The picker will not change since the balancer does not currently + // report an error. + return + } + b.regeneratePicker() + if b.picker != nil { + b.cc.UpdateBalancerState(b.state, b.picker) + } else { + b.cc.UpdateState(balancer.State{ + ConnectivityState: b.state, + Picker: b.v2Picker, + }) } } @@ -99,6 +113,12 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { if grpclog.V(2) { grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s) } + if len(s.ResolverState.Addresses) == 0 { + b.ResolverError(errors.New("produced zero addresses")) + return balancer.ErrBadResolverState + } + // Successful resolution; clear resolver error and ensure we return nil. + b.resolverErr = nil // addrsSet is the set converted from addrs, it's used for quick lookup of an address. addrsSet := make(map[resolver.Address]struct{}) for _, a := range s.ResolverState.Addresses { @@ -127,24 +147,30 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { return nil } +// mergeErrors builds an error from the last connection error and the last +// resolver error. Must only be called if b.state is TransientFailure. +func (b *baseBalancer) mergeErrors() error { + // connErr must always be non-nil unless there are no SubConns, in which + // case resolverErr must be non-nil. + if b.connErr == nil { + return fmt.Errorf("last resolver error: %v", b.resolverErr) + } + if b.resolverErr == nil { + return fmt.Errorf("last connection error: %v", b.connErr) + } + return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr) +} + // regeneratePicker takes a snapshot of the balancer, and generates a picker // from it. The picker is -// - errPicker with ErrTransientFailure if the balancer is in TransientFailure, +// - errPicker if the balancer is in TransientFailure, // - built by the pickerBuilder with all READY SubConns otherwise. -func (b *baseBalancer) regeneratePicker(err error) { +func (b *baseBalancer) regeneratePicker() { if b.state == connectivity.TransientFailure { if b.pickerBuilder != nil { b.picker = NewErrPicker(balancer.ErrTransientFailure) } else { - if err != nil { - b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(err)) - } else { - // This means the last subchannel transition was not to - // TransientFailure (otherwise err must be set), but the - // aggregate state of the balancer is TransientFailure, meaning - // there are no other addresses. - b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(errors.New("resolver returned no addresses"))) - } + b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(b.mergeErrors())) } return } @@ -200,6 +226,9 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su oldAggrState := b.state b.state = b.csEvltr.RecordTransition(oldS, s) + // Set or clear the last connection error accordingly. + b.connErr = state.ConnectionError + // Regenerate picker when one of the following happens: // - this sc became ready from not-ready // - this sc became not-ready from ready @@ -207,7 +236,7 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su // - the aggregated state of balancer became non-TransientFailure from TransientFailure if (s == connectivity.Ready) != (oldS == connectivity.Ready) || (b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) { - b.regeneratePicker(state.ConnectionError) + b.regeneratePicker() } if b.picker != nil { diff --git a/balancer/roundrobin/roundrobin_test.go b/balancer/roundrobin/roundrobin_test.go index 4f13bdd78976..f43fb69edd78 100644 --- a/balancer/roundrobin/roundrobin_test.go +++ b/balancer/roundrobin/roundrobin_test.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "net" + "strings" "sync" "testing" "time" @@ -29,6 +30,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" _ "google.golang.org/grpc/grpclog/glogger" "google.golang.org/grpc/internal/leakcheck" "google.golang.org/grpc/peer" @@ -210,15 +212,31 @@ func TestAddressesRemoved(t *testing.T) { } r.UpdateState(resolver.State{Addresses: []resolver.Address{}}) - for i := 0; i < 1000; i++ { - ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) - defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded { - return + // Removing addresses results in an error reported to the clientconn, but + // the existing connections remain. RPCs should still succeed. + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() = _, %v, want _, ", err) + } + + // Stop the server to bring the channel state into transient failure. + test.cleanup() + // Wait for not-ready. + for src := cc.GetState(); src == connectivity.Ready; src = cc.GetState() { + if !cc.WaitForStateChange(ctx, src) { + t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready) } - time.Sleep(time.Millisecond) } - t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded") + // Report an empty server list again; because the state is not ready, the + // empty address list error should surface to the user. + r.UpdateState(resolver.State{Addresses: []resolver.Address{}}) + + const msgWant = "produced zero addresses" + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || !strings.Contains(status.Convert(err).Message(), msgWant) { + t.Fatalf("EmptyCall() = _, %v, want _, Contains(Message(), %q)", err, msgWant) + } + } func TestCloseWithPendingRPC(t *testing.T) { diff --git a/balancer_conn_wrappers_test.go b/balancer_conn_wrappers_test.go index a29a7dd8fade..33a439f806d1 100644 --- a/balancer_conn_wrappers_test.go +++ b/balancer_conn_wrappers_test.go @@ -20,9 +20,11 @@ package grpc import ( "fmt" + "net" "testing" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -88,3 +90,32 @@ func (s) TestBalancerErrorResolverPolling(t *testing.T) { }, WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, balName))) } + +// TestRoundRobinZeroAddressesResolverPolling reports no addresses to the round +// robin balancer and verifies ResolveNow is called on the resolver with the +// appropriate backoff strategy being consulted between ResolveNow calls. +func (s) TestRoundRobinZeroAddressesResolverPolling(t *testing.T) { + // We need to start a real server or else the connecting loop will call + // ResolveNow after every iteration, even after a valid resolver result is + // returned. + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + defer lis.Close() + s := NewServer() + defer s.Stop() + go s.Serve(lis) + + testResolverErrorPolling(t, + func(r *manual.Resolver) { + // No addresses so the balancer will fail. + r.CC.UpdateState(resolver.State{}) + }, func(r *manual.Resolver) { + // UpdateState will block if ResolveNow is being called (which + // blocks on rn), so call it in a goroutine. Include a valid + // address so the balancer will be happy. + go r.CC.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) + }, + WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name))) +} diff --git a/test/channelz_test.go b/test/channelz_test.go index 45021022529d..744284de3039 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -1515,15 +1515,29 @@ func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) { t.Fatal(err) } - r.UpdateState(resolver.State{Addresses: []resolver.Address{}}) + // Wait for ready + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() { + if !te.cc.WaitForStateChange(ctx, src) { + t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.Ready) + } + } + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}}) + // Wait for not-ready. + for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() { + if !te.cc.WaitForStateChange(ctx, src) { + t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready) + } + } if err := verifyResultWithDelay(func() (bool, error) { tcs, _ := channelz.GetTopChannels(0, 0) if len(tcs) != 1 { return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) } - if len(tcs[0].SubChans) != 0 { - return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans)) + if len(tcs[0].SubChans) != 1 { + return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) } scm := channelz.GetSubChannel(subConn) if scm == nil { @@ -1770,7 +1784,7 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) { return false, fmt.Errorf("transient failure has not happened on SubChannel yet") } transient = 0 - r.UpdateState(resolver.State{Addresses: []resolver.Address{}}) + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}}) for _, e := range scm.Trace.Events { if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) { ready++ @@ -1866,7 +1880,7 @@ func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) { czCleanup := channelz.NewChannelzStorage() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv - // avoid calling API to set balancer type, which will void service config's change of balancer. + // avoid newTest using WithBalancer, which would override service config's change of balancer below. e.balancer = "" te := newTest(t, e) channelz.SetMaxTraceEntry(1) @@ -1956,19 +1970,20 @@ func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) { t.Fatal(err) } - r.UpdateState(resolver.State{Addresses: []resolver.Address{}}) - - if err := verifyResultWithDelay(func() (bool, error) { - tcs, _ := channelz.GetTopChannels(0, 0) - if len(tcs) != 1 { - return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) + // Wait for ready + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() { + if !te.cc.WaitForStateChange(ctx, src) { + t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.Ready) } - if len(tcs[0].SubChans) != 0 { - return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans)) + } + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}}) + // Wait for not-ready. + for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() { + if !te.cc.WaitForStateChange(ctx, src) { + t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready) } - return true, nil - }); err != nil { - t.Fatal(err) } // verify that the subchannel no longer exist due to trace referencing it got overwritten. diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index 1aa9d6b47c20..a08718d0971c 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -507,7 +507,7 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) { default: } // trigger teardown of the ac - r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: sc}) + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}, ServiceConfig: sc}) select { case <-hcExitChan: @@ -653,7 +653,7 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *tes t.Fatal("Health check function has not been invoked after 5s.") } // trigger teardown of the ac, ac in SHUTDOWN state - r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: sc}) + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}, ServiceConfig: sc}) // The health check func should exit without calling the setConnectivityState func, as server hasn't sent // any response.