Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer/base: consider an empty address list an error #3361

Merged
merged 5 commits into from
Feb 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 48 additions & 19 deletions balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package base
import (
"context"
"errors"
"fmt"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
Expand Down Expand Up @@ -76,20 +77,33 @@ type baseBalancer struct {
picker balancer.Picker
v2Picker balancer.V2Picker
config Config

resolverErr error // the last error reported by the resolver; cleared on successful resolution
connErr error // the last connection error; cleared upon leaving TransientFailure
}

func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
panic("not implemented")
}

func (b *baseBalancer) ResolverError(err error) {
switch b.state {
case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
if b.picker != nil {
b.picker = NewErrPicker(err)
} else {
b.v2Picker = NewErrPickerV2(err)
}
b.resolverErr = err
if len(b.subConns) == 0 {
b.state = connectivity.TransientFailure
}
if b.state != connectivity.TransientFailure {
// The picker will not change since the balancer does not currently
// report an error.
return
}
b.regeneratePicker()
if b.picker != nil {
b.cc.UpdateBalancerState(b.state, b.picker)
} else {
b.cc.UpdateState(balancer.State{
ConnectivityState: b.state,
Picker: b.v2Picker,
})
}
}

Expand All @@ -99,6 +113,12 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if grpclog.V(2) {
grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
}
if len(s.ResolverState.Addresses) == 0 {
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
// Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range s.ResolverState.Addresses {
Expand Down Expand Up @@ -127,24 +147,30 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
return nil
}

// mergeErrors builds an error from the last connection error and the last
// resolver error. Must only be called if b.state is TransientFailure.
func (b *baseBalancer) mergeErrors() error {
// connErr must always be non-nil unless there are no SubConns, in which
// case resolverErr must be non-nil.
if b.connErr == nil {
return fmt.Errorf("last resolver error: %v", b.resolverErr)
}
if b.resolverErr == nil {
return fmt.Errorf("last connection error: %v", b.connErr)
}
return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr)
}

// regeneratePicker takes a snapshot of the balancer, and generates a picker
// from it. The picker is
// - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
// - errPicker if the balancer is in TransientFailure,
// - built by the pickerBuilder with all READY SubConns otherwise.
func (b *baseBalancer) regeneratePicker(err error) {
func (b *baseBalancer) regeneratePicker() {
if b.state == connectivity.TransientFailure {
if b.pickerBuilder != nil {
b.picker = NewErrPicker(balancer.ErrTransientFailure)
} else {
if err != nil {
b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(err))
} else {
// This means the last subchannel transition was not to
// TransientFailure (otherwise err must be set), but the
// aggregate state of the balancer is TransientFailure, meaning
// there are no other addresses.
b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(errors.New("resolver returned no addresses")))
}
b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(b.mergeErrors()))
}
return
}
Expand Down Expand Up @@ -200,14 +226,17 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
oldAggrState := b.state
b.state = b.csEvltr.RecordTransition(oldS, s)

// Set or clear the last connection error accordingly.
b.connErr = state.ConnectionError

// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
b.regeneratePicker(state.ConnectionError)
b.regeneratePicker()
}

if b.picker != nil {
Expand Down
32 changes: 25 additions & 7 deletions balancer/roundrobin/roundrobin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"context"
"fmt"
"net"
"strings"
"sync"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/peer"
Expand Down Expand Up @@ -210,15 +212,31 @@ func TestAddressesRemoved(t *testing.T) {
}

r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
for i := 0; i < 1000; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
return
// Removing addresses results in an error reported to the clientconn, but
// the existing connections remain. RPCs should still succeed.
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
}

// Stop the server to bring the channel state into transient failure.
test.cleanup()
// Wait for not-ready.
for src := cc.GetState(); src == connectivity.Ready; src = cc.GetState() {
if !cc.WaitForStateChange(ctx, src) {
t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready)
}
time.Sleep(time.Millisecond)
}
t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded")
// Report an empty server list again; because the state is not ready, the
// empty address list error should surface to the user.
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})

const msgWant = "produced zero addresses"
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || !strings.Contains(status.Convert(err).Message(), msgWant) {
t.Fatalf("EmptyCall() = _, %v, want _, Contains(Message(), %q)", err, msgWant)
}

}

func TestCloseWithPendingRPC(t *testing.T) {
Expand Down
31 changes: 31 additions & 0 deletions balancer_conn_wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package grpc

import (
"fmt"
"net"
"testing"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
Expand Down Expand Up @@ -88,3 +90,32 @@ func (s) TestBalancerErrorResolverPolling(t *testing.T) {
},
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, balName)))
}

// TestRoundRobinZeroAddressesResolverPolling reports no addresses to the round
// robin balancer and verifies ResolveNow is called on the resolver with the
// appropriate backoff strategy being consulted between ResolveNow calls.
func (s) TestRoundRobinZeroAddressesResolverPolling(t *testing.T) {
// We need to start a real server or else the connecting loop will call
// ResolveNow after every iteration, even after a valid resolver result is
// returned.
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis.Close()
s := NewServer()
defer s.Stop()
go s.Serve(lis)

testResolverErrorPolling(t,
func(r *manual.Resolver) {
// No addresses so the balancer will fail.
r.CC.UpdateState(resolver.State{})
}, func(r *manual.Resolver) {
// UpdateState will block if ResolveNow is being called (which
// blocks on rn), so call it in a goroutine. Include a valid
// address so the balancer will be happy.
go r.CC.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
},
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name)))
}
47 changes: 31 additions & 16 deletions test/channelz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,15 +1515,29 @@ func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) {
t.Fatal(err)
}

r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
// Wait for ready
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() {
if !te.cc.WaitForStateChange(ctx, src) {
t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.Ready)
}
}
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
// Wait for not-ready.
for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() {
if !te.cc.WaitForStateChange(ctx, src) {
t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready)
}
}

if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
if len(tcs[0].SubChans) != 0 {
return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans))
if len(tcs[0].SubChans) != 1 {
return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
}
scm := channelz.GetSubChannel(subConn)
if scm == nil {
Expand Down Expand Up @@ -1770,7 +1784,7 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) {
return false, fmt.Errorf("transient failure has not happened on SubChannel yet")
}
transient = 0
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
for _, e := range scm.Trace.Events {
if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) {
ready++
Expand Down Expand Up @@ -1866,7 +1880,7 @@ func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) {
czCleanup := channelz.NewChannelzStorage()
defer czCleanupWrapper(czCleanup, t)
e := tcpClearRREnv
// avoid calling API to set balancer type, which will void service config's change of balancer.
// avoid newTest using WithBalancer, which would override service config's change of balancer below.
e.balancer = ""
te := newTest(t, e)
channelz.SetMaxTraceEntry(1)
Expand Down Expand Up @@ -1956,19 +1970,20 @@ func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
t.Fatal(err)
}

r.UpdateState(resolver.State{Addresses: []resolver.Address{}})

if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
// Wait for ready
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() {
if !te.cc.WaitForStateChange(ctx, src) {
t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.Ready)
}
if len(tcs[0].SubChans) != 0 {
return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans))
}
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
// Wait for not-ready.
for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() {
if !te.cc.WaitForStateChange(ctx, src) {
t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready)
}
return true, nil
}); err != nil {
t.Fatal(err)
}

// verify that the subchannel no longer exist due to trace referencing it got overwritten.
Expand Down
4 changes: 2 additions & 2 deletions test/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
default:
}
// trigger teardown of the ac
r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: sc})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}, ServiceConfig: sc})

select {
case <-hcExitChan:
Expand Down Expand Up @@ -653,7 +653,7 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *tes
t.Fatal("Health check function has not been invoked after 5s.")
}
// trigger teardown of the ac, ac in SHUTDOWN state
r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: sc})
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}, ServiceConfig: sc})

// The health check func should exit without calling the setConnectivityState func, as server hasn't sent
// any response.
Expand Down