diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a5c6e06e2550..8ec6c95747b3 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -7,7 +7,7 @@ If you are new to github, please start by reading [Pull Request howto](https://h ## Legal requirements In order to protect both you and ourselves, you will need to sign the -[Contributor License Agreement](https://cla.developers.google.com/clas). +[Contributor License Agreement](https://identity.linuxfoundation.org/projects/cncf). ## Guidelines for Pull Requests How to get your contributions merged smoothly and quickly. diff --git a/balancer.go b/balancer.go index ab65049ddc17..300da6c5e871 100644 --- a/balancer.go +++ b/balancer.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/naming" + "google.golang.org/grpc/status" ) // Address represents a server the client connects to. @@ -310,7 +311,7 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad if !opts.BlockingWait { if len(rr.addrs) == 0 { rr.mu.Unlock() - err = Errorf(codes.Unavailable, "there is no address available") + err = status.Errorf(codes.Unavailable, "there is no address available") return } // Returns the next addr on rr.addrs for failfast RPCs. diff --git a/balancer/roundrobin/roundrobin_test.go b/balancer/roundrobin/roundrobin_test.go index 7f953ff002f2..322c4f073e36 100644 --- a/balancer/roundrobin/roundrobin_test.go +++ b/balancer/roundrobin/roundrobin_test.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" _ "google.golang.org/grpc/grpclog/glogger" + "google.golang.org/grpc/internal" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -37,6 +38,7 @@ import ( "google.golang.org/grpc/test/leakcheck" ) + var rr = balancer.Get("round_robin") type testServer struct { @@ -111,7 +113,7 @@ func TestOneBackend(t *testing.T) { // The first RPC should fail because there's no address. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } @@ -143,7 +145,7 @@ func TestBackendsRoundRobin(t *testing.T) { // The first RPC should fail because there's no address. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } @@ -202,7 +204,7 @@ func TestAddressesRemoved(t *testing.T) { // The first RPC should fail because there's no address. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } @@ -216,7 +218,7 @@ func TestAddressesRemoved(t *testing.T) { for i := 0; i < 1000; i++ { ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) == codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); internal.Code(err) == codes.DeadlineExceeded { return } time.Sleep(time.Millisecond) @@ -248,7 +250,7 @@ func TestCloseWithPendingRPC(t *testing.T) { defer wg.Done() // This RPC blocks until cc is closed. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) == codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); internal.Code(err) == codes.DeadlineExceeded { t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing") } cancel() @@ -278,7 +280,7 @@ func TestNewAddressWhileBlocking(t *testing.T) { // The first RPC should fail because there's no address. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } @@ -327,7 +329,7 @@ func TestOneServerDown(t *testing.T) { // The first RPC should fail because there's no address. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } @@ -424,7 +426,7 @@ func TestAllServersDown(t *testing.T) { // The first RPC should fail because there's no address. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } @@ -468,7 +470,7 @@ func TestAllServersDown(t *testing.T) { } time.Sleep(100 * time.Millisecond) for i := 0; i < 1000; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) == codes.Unavailable { + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); internal.Code(err) == codes.Unavailable { return } time.Sleep(time.Millisecond) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 395c4b6b4dea..001b2645cec9 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -98,7 +98,7 @@ type ccBalancerWrapper struct { resolverUpdateCh chan *resolverUpdate done chan struct{} - mu sync.RWMutex + mu sync.Mutex subConns map[*acBalancerWrapper]struct{} } @@ -143,11 +143,13 @@ func (ccb *ccBalancerWrapper) watcher() { select { case <-ccb.done: ccb.balancer.Close() - ccb.mu.RLock() - for acbw := range ccb.subConns { + ccb.mu.Lock() + scs := ccb.subConns + ccb.subConns = nil + ccb.mu.Unlock() + for acbw := range scs { ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) } - ccb.mu.RUnlock() return default: } @@ -190,6 +192,11 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer if len(addrs) <= 0 { return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") } + ccb.mu.Lock() + defer ccb.mu.Unlock() + if ccb.subConns == nil { + return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed") + } ac, err := ccb.cc.newAddrConn(addrs) if err != nil { return nil, err @@ -198,9 +205,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer acbw.ac.mu.Lock() ac.acbw = acbw acbw.ac.mu.Unlock() - ccb.mu.Lock() ccb.subConns[acbw] = struct{}{} - ccb.mu.Unlock() return acbw, nil } @@ -210,12 +215,20 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { return } ccb.mu.Lock() + defer ccb.mu.Unlock() + if ccb.subConns == nil { + return + } delete(ccb.subConns, acbw) - ccb.mu.Unlock() ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) } func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) { + ccb.mu.Lock() + defer ccb.mu.Unlock() + if ccb.subConns == nil { + return + } ccb.cc.csMgr.updateState(s) ccb.cc.blockingpicker.updatePicker(p) } diff --git a/balancer_switching_test.go b/balancer_switching_test.go index 92c196d388e1..d4e8c9899622 100644 --- a/balancer_switching_test.go +++ b/balancer_switching_test.go @@ -39,7 +39,7 @@ func checkPickFirst(cc *ClientConn, servers []*server) error { ) connected := false for i := 0; i < 1000; i++ { - if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); ErrorDesc(err) == servers[0].port { + if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); errorDesc(err) == servers[0].port { if connected { // connected is set to false if peer is not server[0]. So if // connected is true here, this is the second time we saw @@ -58,7 +58,7 @@ func checkPickFirst(cc *ClientConn, servers []*server) error { // The following RPCs should all succeed with the first server. for i := 0; i < 3; i++ { err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc) - if ErrorDesc(err) != servers[0].port { + if errorDesc(err) != servers[0].port { return fmt.Errorf("Index %d: want peer %v, got peer %v", i, servers[0].port, err) } } @@ -79,7 +79,7 @@ func checkRoundRobin(cc *ClientConn, servers []*server) error { for _, s := range servers { var up bool for i := 0; i < 1000; i++ { - if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); ErrorDesc(err) == s.port { + if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); errorDesc(err) == s.port { up = true break } @@ -94,7 +94,7 @@ func checkRoundRobin(cc *ClientConn, servers []*server) error { serverCount := len(servers) for i := 0; i < 3*serverCount; i++ { err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc) - if ErrorDesc(err) != servers[i%serverCount].port { + if errorDesc(err) != servers[i%serverCount].port { return fmt.Errorf("Index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err) } } diff --git a/balancer_test.go b/balancer_test.go index a1558f027a59..a198d99fd893 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -35,6 +35,7 @@ import ( // V1 balancer tests use passthrough resolver instead of dns. // TODO(bar) remove this when removing v1 balaner entirely. _ "google.golang.org/grpc/resolver/passthrough" + "google.golang.org/grpc/internal" ) type testWatcher struct { @@ -128,7 +129,7 @@ func TestNameDiscovery(t *testing.T) { defer cc.Close() req := "port" var reply string - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[0].port { t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) } // Inject the name resolution change to remove servers[0] and add servers[1]. @@ -144,7 +145,7 @@ func TestNameDiscovery(t *testing.T) { r.w.inject(updates) // Loop until the rpcs in flight talks to servers[1]. for { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[1].port { break } time.Sleep(10 * time.Millisecond) @@ -204,7 +205,7 @@ func TestRoundRobin(t *testing.T) { var reply string // Loop until servers[1] is up for { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[1].port { break } time.Sleep(10 * time.Millisecond) @@ -217,14 +218,14 @@ func TestRoundRobin(t *testing.T) { r.w.inject([]*naming.Update{u}) // Loop until both servers[2] are up. for { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[2].port { break } time.Sleep(10 * time.Millisecond) } // Check the incoming RPCs served in a round-robin manner. for i := 0; i < 10; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[i%numServers].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[i%numServers].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port) } } @@ -252,7 +253,7 @@ func TestCloseWithPendingRPC(t *testing.T) { // Loop until the above update applies. for { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) - if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded { + if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); internal.Code(err) == codes.DeadlineExceeded { cancel() break } @@ -300,7 +301,7 @@ func TestGetOnWaitChannel(t *testing.T) { for { var reply string ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) - if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded { + if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); internal.Code(err) == codes.DeadlineExceeded { cancel() break } @@ -348,7 +349,7 @@ func TestOneServerDown(t *testing.T) { var reply string // Loop until servers[1] is up for { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[1].port { break } time.Sleep(10 * time.Millisecond) @@ -401,7 +402,7 @@ func TestOneAddressRemoval(t *testing.T) { var reply string // Loop until servers[1] is up for { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[1].port { break } time.Sleep(10 * time.Millisecond) @@ -450,7 +451,7 @@ func checkServerUp(t *testing.T, currentServer *server) { defer cc.Close() var reply string for { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == port { break } time.Sleep(10 * time.Millisecond) @@ -511,7 +512,7 @@ func TestPickFirstCloseWithPendingRPC(t *testing.T) { // Loop until the above update applies. for { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) - if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded { + if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); internal.Code(err) == codes.DeadlineExceeded { cancel() break } @@ -574,7 +575,7 @@ func TestPickFirstOrderAllServerUp(t *testing.T) { req := "port" var reply string for i := 0; i < 20; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[0].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) } time.Sleep(10 * time.Millisecond) @@ -589,13 +590,13 @@ func TestPickFirstOrderAllServerUp(t *testing.T) { r.w.inject([]*naming.Update{u}) // Loop until it changes to server[1] for { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[1].port { break } time.Sleep(10 * time.Millisecond) } for i := 0; i < 20; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[1].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) } time.Sleep(10 * time.Millisecond) @@ -609,7 +610,7 @@ func TestPickFirstOrderAllServerUp(t *testing.T) { } r.w.inject([]*naming.Update{u}) for i := 0; i < 20; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[1].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) } time.Sleep(10 * time.Millisecond) @@ -622,13 +623,13 @@ func TestPickFirstOrderAllServerUp(t *testing.T) { } r.w.inject([]*naming.Update{u}) for { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[2].port { break } time.Sleep(1 * time.Second) } for i := 0; i < 20; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[2].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[2].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port) } time.Sleep(10 * time.Millisecond) @@ -641,13 +642,13 @@ func TestPickFirstOrderAllServerUp(t *testing.T) { } r.w.inject([]*naming.Update{u}) for { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[0].port { break } time.Sleep(1 * time.Second) } for i := 0; i < 20; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[0].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) } time.Sleep(10 * time.Millisecond) @@ -687,7 +688,7 @@ func TestPickFirstOrderOneServerDown(t *testing.T) { req := "port" var reply string for i := 0; i < 20; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[0].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) } time.Sleep(10 * time.Millisecond) @@ -698,13 +699,13 @@ func TestPickFirstOrderOneServerDown(t *testing.T) { servers[0].stop() // Loop until it changes to server[1] for { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[1].port { break } time.Sleep(10 * time.Millisecond) } for i := 0; i < 20; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[1].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) } time.Sleep(10 * time.Millisecond) @@ -719,7 +720,7 @@ func TestPickFirstOrderOneServerDown(t *testing.T) { checkServerUp(t, servers[0]) for i := 0; i < 20; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[1].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) } time.Sleep(10 * time.Millisecond) @@ -732,13 +733,13 @@ func TestPickFirstOrderOneServerDown(t *testing.T) { } r.w.inject([]*naming.Update{u}) for { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[0].port { break } time.Sleep(1 * time.Second) } for i := 0; i < 20; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[0].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) } time.Sleep(10 * time.Millisecond) diff --git a/balancer_v1_wrapper.go b/balancer_v1_wrapper.go index 6cb39071c1d8..0ffc31237104 100644 --- a/balancer_v1_wrapper.go +++ b/balancer_v1_wrapper.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/status" ) type balancerWrapperBuilder struct { @@ -317,12 +318,12 @@ func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) Metadata: a.Metadata, }] if !ok && failfast { - return nil, nil, Errorf(codes.Unavailable, "there is no connection available") + return nil, nil, status.Errorf(codes.Unavailable, "there is no connection available") } if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) { // If the returned sc is not ready and RPC is failfast, // return error, and this RPC will fail. - return nil, nil, Errorf(codes.Unavailable, "there is no connection available") + return nil, nil, status.Errorf(codes.Unavailable, "there is no connection available") } } diff --git a/benchmark/primitives/code_string_test.go b/benchmark/primitives/code_string_test.go new file mode 100644 index 000000000000..51b1ee48cf3c --- /dev/null +++ b/benchmark/primitives/code_string_test.go @@ -0,0 +1,135 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package primitives_test + +import ( + "strconv" + "testing" + + "google.golang.org/grpc/codes" +) + +type codeBench uint32 + +const ( + OK codeBench = iota + Canceled + Unknown + InvalidArgument + DeadlineExceeded + NotFound + AlreadyExists + PermissionDenied + ResourceExhausted + FailedPrecondition + Aborted + OutOfRange + Unimplemented + Internal + Unavailable + DataLoss + Unauthenticated +) + +// The following String() function was generated by stringer. +const _Code_name = "OKCanceledUnknownInvalidArgumentDeadlineExceededNotFoundAlreadyExistsPermissionDeniedResourceExhaustedFailedPreconditionAbortedOutOfRangeUnimplementedInternalUnavailableDataLossUnauthenticated" + +var _Code_index = [...]uint8{0, 2, 10, 17, 32, 48, 56, 69, 85, 102, 120, 127, 137, 150, 158, 169, 177, 192} + +func (i codeBench) String() string { + if i >= codeBench(len(_Code_index)-1) { + return "Code(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _Code_name[_Code_index[i]:_Code_index[i+1]] +} + +var nameMap = map[codeBench]string{ + OK: "OK", + Canceled: "Canceled", + Unknown: "Unknown", + InvalidArgument: "InvalidArgument", + DeadlineExceeded: "DeadlineExceeded", + NotFound: "NotFound", + AlreadyExists: "AlreadyExists", + PermissionDenied: "PermissionDenied", + ResourceExhausted: "ResourceExhausted", + FailedPrecondition: "FailedPrecondition", + Aborted: "Aborted", + OutOfRange: "OutOfRange", + Unimplemented: "Unimplemented", + Internal: "Internal", + Unavailable: "Unavailable", + DataLoss: "DataLoss", + Unauthenticated: "Unauthenticated", +} + +func (i codeBench) StringUsingMap() string { + if s, ok := nameMap[i]; ok { + return s + } + return "Code(" + strconv.FormatInt(int64(i), 10) + ")" +} + +func BenchmarkCodeStringStringer(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + c := codeBench(uint32(i % 17)) + _ = c.String() + } + b.StopTimer() +} + +func BenchmarkCodeStringMap(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + c := codeBench(uint32(i % 17)) + _ = c.StringUsingMap() + } + b.StopTimer() +} + +// codes.Code.String() does a switch. +func BenchmarkCodeStringSwitch(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + c := codes.Code(uint32(i % 17)) + _ = c.String() + } + b.StopTimer() +} + +// Testing all codes (0<=c<=16) and also one overflow (17). +func BenchmarkCodeStringStringerWithOverflow(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + c := codeBench(uint32(i % 18)) + _ = c.String() + } + b.StopTimer() +} + +// Testing all codes (0<=c<=16) and also one overflow (17). +func BenchmarkCodeStringSwitchWithOverflow(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + c := codes.Code(uint32(i % 18)) + _ = c.String() + } + b.StopTimer() +} diff --git a/call.go b/call.go index 9c4b25518a64..14216c42bd2e 100644 --- a/call.go +++ b/call.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/peer" "google.golang.org/grpc/stats" "google.golang.org/grpc/transport" + "google.golang.org/grpc/status" ) // recvResponse receives and parses an RPC response. @@ -59,7 +60,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran } for { if c.maxReceiveMessageSize == nil { - return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") + return status.Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") } // Set dc if it exists and matches the message compression type used, @@ -113,7 +114,7 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, compressor = nil // Disable the legacy compressor. comp = encoding.GetCompressor(ct) if comp == nil { - return Errorf(codes.Internal, "grpc: Compressor is not installed for grpc-encoding %q", ct) + return status.Errorf(codes.Internal, "grpc: Compressor is not installed for grpc-encoding %q", ct) } } hdr, data, err := encode(dopts.codec, args, compressor, outPayload, comp) @@ -121,10 +122,10 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, return err } if c.maxSendMessageSize == nil { - return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") + return status.Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") } if len(data) > *c.maxSendMessageSize { - return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), *c.maxSendMessageSize) + return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), *c.maxSendMessageSize) } err = t.Write(stream, hdr, data, opts) if err == nil && outPayload != nil { diff --git a/call_test.go b/call_test.go index f48d30e879b5..6b12fef42ba2 100644 --- a/call_test.go +++ b/call_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/status" "google.golang.org/grpc/test/leakcheck" "google.golang.org/grpc/transport" + "google.golang.org/grpc/internal" ) var ( @@ -233,7 +234,7 @@ func TestInvokeLargeErr(t *testing.T) { if _, ok := status.FromError(err); !ok { t.Fatalf("grpc.Invoke(_, _, _, _, _) receives non rpc error.") } - if Code(err) != codes.Internal || len(ErrorDesc(err)) != sizeLargeErr { + if internal.Code(err) != codes.Internal || len(errorDesc(err)) != sizeLargeErr { t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want an error of code %d and desc size %d", err, codes.Internal, sizeLargeErr) } cc.Close() @@ -250,7 +251,7 @@ func TestInvokeErrorSpecialChars(t *testing.T) { if _, ok := status.FromError(err); !ok { t.Fatalf("grpc.Invoke(_, _, _, _, _) receives non rpc error.") } - if got, want := ErrorDesc(err), weirdError; got != want { + if got, want := errorDesc(err), weirdError; got != want { t.Fatalf("grpc.Invoke(_, _, _, _, _) error = %q, want %q", got, want) } cc.Close() diff --git a/clientconn.go b/clientconn.go index 4a98265a046f..c072f69f0270 100644 --- a/clientconn.go +++ b/clientconn.go @@ -98,8 +98,10 @@ type dialOptions struct { // This is to support v1 balancer. balancerBuilder balancer.Builder // This is to support grpclb. - resolverBuilder resolver.Builder - waitForHandshake bool + resolverBuilder resolver.Builder + // Custom user options for resolver.Build. + resolverBuildUserOptions interface{} + waitForHandshake bool } const ( @@ -223,6 +225,14 @@ func withResolverBuilder(b resolver.Builder) DialOption { } } +// WithResolverUserOptions returns a DialOption which sets the UserOptions +// field of resolver's BuildOption. +func WithResolverUserOptions(userOpt interface{}) DialOption { + return func(o *dialOptions) { + o.resolverBuildUserOptions = userOpt + } +} + // WithServiceConfig returns a DialOption which has a channel to read the service configuration. // DEPRECATED: service config should be received through name resolver, as specified here. // https://github.com/grpc/grpc/blob/master/doc/service_config.md diff --git a/clientconn_test.go b/clientconn_test.go index 2142ff15d095..512a9462ee9e 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -583,41 +583,6 @@ func TestNonblockingDialWithEmptyBalancer(t *testing.T) { } } -func TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) { - defer leakcheck.Check(t) - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure()) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - - // SwitchBalancer before NewAddress. There was no balancer created, this - // makes sure we don't call close on nil balancerWrapper. - r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) // This should not panic. - - time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn. -} - -func TestResolverEmptyUpdateNotPanic(t *testing.T) { - defer leakcheck.Check(t) - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure()) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - - // This make sure we don't create addrConn with empty address list. - r.NewAddress([]resolver.Address{}) // This should not panic. - - time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn. -} - func TestClientUpdatesParamsAfterGoAway(t *testing.T) { defer leakcheck.Check(t) lis, err := net.Listen("tcp", "localhost:0") diff --git a/codes/code_string.go b/codes/code_string.go index d9cf9675b1e6..0b206a57822a 100644 --- a/codes/code_string.go +++ b/codes/code_string.go @@ -1,16 +1,62 @@ -// Code generated by "stringer -type=Code"; DO NOT EDIT. +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ package codes import "strconv" -const _Code_name = "OKCanceledUnknownInvalidArgumentDeadlineExceededNotFoundAlreadyExistsPermissionDeniedResourceExhaustedFailedPreconditionAbortedOutOfRangeUnimplementedInternalUnavailableDataLossUnauthenticated" - -var _Code_index = [...]uint8{0, 2, 10, 17, 32, 48, 56, 69, 85, 102, 120, 127, 137, 150, 158, 169, 177, 192} - -func (i Code) String() string { - if i >= Code(len(_Code_index)-1) { - return "Code(" + strconv.FormatInt(int64(i), 10) + ")" +func (c Code) String() string { + switch c { + case OK: + return "OK" + case Canceled: + return "Canceled" + case Unknown: + return "Unknown" + case InvalidArgument: + return "InvalidArgument" + case DeadlineExceeded: + return "DeadlineExceeded" + case NotFound: + return "NotFound" + case AlreadyExists: + return "AlreadyExists" + case PermissionDenied: + return "PermissionDenied" + case ResourceExhausted: + return "ResourceExhausted" + case FailedPrecondition: + return "FailedPrecondition" + case Aborted: + return "Aborted" + case OutOfRange: + return "OutOfRange" + case Unimplemented: + return "Unimplemented" + case Internal: + return "Internal" + case Unavailable: + return "Unavailable" + case DataLoss: + return "DataLoss" + case Unauthenticated: + return "Unauthenticated" + default: + return "Code(" + strconv.FormatInt(int64(c), 10) + ")" } - return _Code_name[_Code_index[i]:_Code_index[i+1]] } diff --git a/codes/codes.go b/codes/codes.go index 21e7733a5fe9..f3719d562801 100644 --- a/codes/codes.go +++ b/codes/codes.go @@ -19,12 +19,13 @@ // Package codes defines the canonical error codes used by gRPC. It is // consistent across various languages. package codes // import "google.golang.org/grpc/codes" +import ( + "fmt" +) // A Code is an unsigned 32-bit error code as defined in the gRPC spec. type Code uint32 -//go:generate stringer -type=Code - const ( // OK is returned on success. OK Code = 0 @@ -142,3 +143,41 @@ const ( // DataLoss indicates unrecoverable data loss or corruption. DataLoss Code = 15 ) + +var strToCode = map[string]Code{ + `"OK"`: OK, + `"CANCELLED"`:/* [sic] */ Canceled, + `"UNKNOWN"`: Unknown, + `"INVALID_ARGUMENT"`: InvalidArgument, + `"DEADLINE_EXCEEDED"`: DeadlineExceeded, + `"NOT_FOUND"`: NotFound, + `"ALREADY_EXISTS"`: AlreadyExists, + `"PERMISSION_DENIED"`: PermissionDenied, + `"RESOURCE_EXHAUSTED"`: ResourceExhausted, + `"FAILED_PRECONDITION"`: FailedPrecondition, + `"ABORTED"`: Aborted, + `"OUT_OF_RANGE"`: OutOfRange, + `"UNIMPLEMENTED"`: Unimplemented, + `"INTERNAL"`: Internal, + `"UNAVAILABLE"`: Unavailable, + `"DATA_LOSS"`: DataLoss, + `"UNAUTHENTICATED"`: Unauthenticated, +} + +// UnmarshalJSON unmarshals b into the Code. +func (c *Code) UnmarshalJSON(b []byte) error { + // From json.Unmarshaler: By convention, to approximate the behavior of + // Unmarshal itself, Unmarshalers implement UnmarshalJSON([]byte("null")) as + // a no-op. + if string(b) == "null" { + return nil + } + if c == nil { + return fmt.Errorf("nil receiver passed to UnmarshalJSON") + } + if jc, ok := strToCode[string(b)]; ok { + *c = jc + return nil + } + return fmt.Errorf("invalid code: %q", string(b)) +} diff --git a/codes/codes_test.go b/codes/codes_test.go new file mode 100644 index 000000000000..1e3b991848e5 --- /dev/null +++ b/codes/codes_test.go @@ -0,0 +1,64 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package codes + +import ( + "encoding/json" + "reflect" + "testing" + + cpb "google.golang.org/genproto/googleapis/rpc/code" +) + +func TestUnmarshalJSON(t *testing.T) { + for s, v := range cpb.Code_value { + want := Code(v) + var got Code + if err := got.UnmarshalJSON([]byte(`"` + s + `"`)); err != nil || got != want { + t.Errorf("got.UnmarshalJSON(%q) = %v; want . got=%v; want %v", s, err, got, want) + } + } +} + +func TestJSONUnmarshal(t *testing.T) { + var got []Code + want := []Code{OK, NotFound, Internal, Canceled} + in := `["OK", "NOT_FOUND", "INTERNAL", "CANCELLED"]` + err := json.Unmarshal([]byte(in), &got) + if err != nil || !reflect.DeepEqual(got, want) { + t.Fatalf("json.Unmarshal(%q, &got) = %v; want . got=%v; want %v", in, err, got, want) + } +} + +func TestUnmarshalJSON_NilReceiver(t *testing.T) { + var got *Code + in := OK.String() + if err := got.UnmarshalJSON([]byte(in)); err == nil { + t.Errorf("got.UnmarshalJSON(%q) = nil; want . got=%v", in, got) + } +} + +func TestUnmarshalJSON_UnknownInput(t *testing.T) { + var got Code + for _, in := range [][]byte{[]byte(""), []byte("xxx"), []byte("Code(17)"), nil} { + if err := got.UnmarshalJSON([]byte(in)); err == nil { + t.Errorf("got.UnmarshalJSON(%q) = nil; want . got=%v", in, got) + } + } +} diff --git a/examples/route_guide/README.md b/examples/route_guide/README.md index 7f009d5caf92..ddec3a0bb5b8 100644 --- a/examples/route_guide/README.md +++ b/examples/route_guide/README.md @@ -2,7 +2,7 @@ The route guide server and client demonstrate how to use grpc go libraries to perform unary, client streaming, server streaming and full duplex RPCs. -Please refer to [gRPC Basics: Go] (https://grpc.io/docs/tutorials/basic/go.html) for more information. +Please refer to [gRPC Basics: Go](https://grpc.io/docs/tutorials/basic/go.html) for more information. See the definition of the route guide service in routeguide/route_guide.proto. diff --git a/grpclb/grpclb_test.go b/grpclb/grpclb_test.go index 9bab731c6f53..569111acbf7e 100644 --- a/grpclb/grpclb_test.go +++ b/grpclb/grpclb_test.go @@ -34,6 +34,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "google.golang.org/grpc/internal" "golang.org/x/net/context" "google.golang.org/grpc" @@ -485,7 +486,7 @@ func TestDropRequest(t *testing.T) { for i := 0; i < 3; i++ { // Even RPCs should fail, because the 2st backend has // DropForLoadBalancing set to true. - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(failfast)); grpc.Code(err) != codes.Unavailable { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(failfast)); internal.Code(err) != codes.Unavailable { t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable) } // Odd RPCs should succeed since they choose the non-drop-request diff --git a/internal/internal.go b/internal/internal.go index 53f1775201ce..b221110b2169 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -19,9 +19,23 @@ // the godoc of the top-level grpc package. package internal +import ( + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + // TestingUseHandlerImpl enables the http.Handler-based server implementation. // It must be called before Serve and requires TLS credentials. // // The provided grpcServer must be of type *grpc.Server. It is untyped // for circular dependency reasons. var TestingUseHandlerImpl func(grpcServer interface{}) + +// Code returns the error code for err if it was produced by the rpc system. +// Otherwise, it returns codes.Unknown. +func Code(err error) codes.Code { + if s, ok := status.FromError(err); ok { + return s.Code() + } + return codes.Unknown +} diff --git a/interop/http2/negative_http2_client.go b/interop/http2/negative_http2_client.go index 8f94edd4ae32..45ed85065902 100644 --- a/interop/http2/negative_http2_client.go +++ b/interop/http2/negative_http2_client.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal" "google.golang.org/grpc/interop" testpb "google.golang.org/grpc/interop/grpc_testing" ) @@ -77,8 +78,8 @@ func rstAfterHeader(tc testpb.TestServiceClient) { if reply != nil { grpclog.Fatalf("Client received reply despite server sending rst stream after header") } - if grpc.Code(err) != codes.Internal { - grpclog.Fatalf("%v.UnaryCall() = _, %v, want _, %v", tc, grpc.Code(err), codes.Internal) + if internal.Code(err) != codes.Internal { + grpclog.Fatalf("%v.UnaryCall() = _, %v, want _, %v", tc, internal.Code(err), codes.Internal) } } @@ -88,8 +89,8 @@ func rstDuringData(tc testpb.TestServiceClient) { if reply != nil { grpclog.Fatalf("Client received reply despite server sending rst stream during data") } - if grpc.Code(err) != codes.Unknown { - grpclog.Fatalf("%v.UnaryCall() = _, %v, want _, %v", tc, grpc.Code(err), codes.Unknown) + if internal.Code(err) != codes.Unknown { + grpclog.Fatalf("%v.UnaryCall() = _, %v, want _, %v", tc, internal.Code(err), codes.Unknown) } } @@ -99,8 +100,8 @@ func rstAfterData(tc testpb.TestServiceClient) { if reply != nil { grpclog.Fatalf("Client received reply despite server sending rst stream after data") } - if grpc.Code(err) != codes.Internal { - grpclog.Fatalf("%v.UnaryCall() = _, %v, want _, %v", tc, grpc.Code(err), codes.Internal) + if internal.Code(err) != codes.Internal { + grpclog.Fatalf("%v.UnaryCall() = _, %v, want _, %v", tc, internal.Code(err), codes.Internal) } } diff --git a/interop/test_utils.go b/interop/test_utils.go index b1534b3fc69b..252473b3d226 100644 --- a/interop/test_utils.go +++ b/interop/test_utils.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal" testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -231,7 +232,7 @@ func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient, args ...grpc.CallOpt defer cancel() stream, err := tc.FullDuplexCall(ctx, args...) if err != nil { - if grpc.Code(err) == codes.DeadlineExceeded { + if internal.Code(err) == codes.DeadlineExceeded { return } grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err) @@ -242,11 +243,11 @@ func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient, args ...grpc.CallOpt Payload: pl, } if err := stream.Send(req); err != nil { - if grpc.Code(err) != codes.DeadlineExceeded { + if internal.Code(err) != codes.DeadlineExceeded { grpclog.Fatalf("%v.Send(_) = %v", stream, err) } } - if _, err := stream.Recv(); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := stream.Recv(); internal.Code(err) != codes.DeadlineExceeded { grpclog.Fatalf("%v.Recv() = _, %v, want error code %d", stream, err, codes.DeadlineExceeded) } } @@ -409,8 +410,8 @@ func DoCancelAfterBegin(tc testpb.TestServiceClient, args ...grpc.CallOption) { } cancel() _, err = stream.CloseAndRecv() - if grpc.Code(err) != codes.Canceled { - grpclog.Fatalf("%v.CloseAndRecv() got error code %d, want %d", stream, grpc.Code(err), codes.Canceled) + if internal.Code(err) != codes.Canceled { + grpclog.Fatalf("%v.CloseAndRecv() got error code %d, want %d", stream, internal.Code(err), codes.Canceled) } } @@ -439,8 +440,8 @@ func DoCancelAfterFirstResponse(tc testpb.TestServiceClient, args ...grpc.CallOp grpclog.Fatalf("%v.Recv() = %v", stream, err) } cancel() - if _, err := stream.Recv(); grpc.Code(err) != codes.Canceled { - grpclog.Fatalf("%v compleled with error code %d, want %d", stream, grpc.Code(err), codes.Canceled) + if _, err := stream.Recv(); internal.Code(err) != codes.Canceled { + grpclog.Fatalf("%v compleled with error code %d, want %d", stream, internal.Code(err), codes.Canceled) } } @@ -568,15 +569,15 @@ func DoStatusCodeAndMessage(tc testpb.TestServiceClient, args ...grpc.CallOption // DoUnimplementedService attempts to call a method from an unimplemented service. func DoUnimplementedService(tc testpb.UnimplementedServiceClient) { _, err := tc.UnimplementedCall(context.Background(), &testpb.Empty{}) - if grpc.Code(err) != codes.Unimplemented { - grpclog.Fatalf("%v.UnimplementedCall() = _, %v, want _, %v", tc, grpc.Code(err), codes.Unimplemented) + if internal.Code(err) != codes.Unimplemented { + grpclog.Fatalf("%v.UnimplementedCall() = _, %v, want _, %v", tc, internal.Code(err), codes.Unimplemented) } } // DoUnimplementedMethod attempts to call an unimplemented method. func DoUnimplementedMethod(cc *grpc.ClientConn) { var req, reply proto.Message - if err := grpc.Invoke(context.Background(), "/grpc.testing.TestService/UnimplementedCall", req, reply, cc); err == nil || grpc.Code(err) != codes.Unimplemented { + if err := grpc.Invoke(context.Background(), "/grpc.testing.TestService/UnimplementedCall", req, reply, cc); err == nil || internal.Code(err) != codes.Unimplemented { grpclog.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want error code %s", err, codes.Unimplemented) } } diff --git a/naming/go17.go b/naming/go17.go index a537b08c602b..57b65d7b8898 100644 --- a/naming/go17.go +++ b/naming/go17.go @@ -1,4 +1,4 @@ -// +build go1.6, !go1.8 +// +build go1.6,!go1.8 /* * diff --git a/pickfirst_test.go b/pickfirst_test.go index e58b3422c149..bdf863d328a3 100644 --- a/pickfirst_test.go +++ b/pickfirst_test.go @@ -26,11 +26,20 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/status" "google.golang.org/grpc/test/leakcheck" ) +func errorDesc(err error) string { + if s, ok := status.FromError(err); ok { + return s.Message() + } + return err.Error() +} + func TestOneBackendPickfirst(t *testing.T) { defer leakcheck.Check(t) r, rcleanup := manual.GenerateAndRegisterManualResolver() @@ -50,14 +59,14 @@ func TestOneBackendPickfirst(t *testing.T) { defer cancel() req := "port" var reply string - if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded { + if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } r.NewAddress([]resolver.Address{{Addr: servers[0].addr}}) // The second RPC should succeed. for i := 0; i < 1000; i++ { - if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port { + if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[0].port { return } time.Sleep(time.Millisecond) @@ -84,14 +93,14 @@ func TestBackendsPickfirst(t *testing.T) { defer cancel() req := "port" var reply string - if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded { + if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}) // The second RPC should succeed with the first server. for i := 0; i < 1000; i++ { - if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port { + if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[0].port { return } time.Sleep(time.Millisecond) @@ -118,7 +127,7 @@ func TestNewAddressWhileBlockingPickfirst(t *testing.T) { defer cancel() req := "port" var reply string - if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded { + if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } @@ -155,7 +164,7 @@ func TestCloseWithPendingRPCPickfirst(t *testing.T) { defer cancel() req := "port" var reply string - if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded { + if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } @@ -192,14 +201,14 @@ func TestOneServerDownPickfirst(t *testing.T) { defer cancel() req := "port" var reply string - if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded { + if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}) // The second RPC should succeed with the first server. for i := 0; i < 1000; i++ { - if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port { + if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[0].port { break } time.Sleep(time.Millisecond) @@ -207,7 +216,7 @@ func TestOneServerDownPickfirst(t *testing.T) { servers[0].stop() for i := 0; i < 1000; i++ { - if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { + if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[1].port { return } time.Sleep(time.Millisecond) @@ -234,14 +243,14 @@ func TestAllServersDownPickfirst(t *testing.T) { defer cancel() req := "port" var reply string - if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded { + if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}) // The second RPC should succeed with the first server. for i := 0; i < 1000; i++ { - if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port { + if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[0].port { break } time.Sleep(time.Millisecond) @@ -251,7 +260,7 @@ func TestAllServersDownPickfirst(t *testing.T) { servers[i].stop() } for i := 0; i < 1000; i++ { - if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); Code(err) == codes.Unavailable { + if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); internal.Code(err) == codes.Unavailable { return } time.Sleep(time.Millisecond) @@ -278,19 +287,19 @@ func TestAddressesRemovedPickfirst(t *testing.T) { defer cancel() req := "port" var reply string - if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || Code(err) != codes.DeadlineExceeded { + if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil || internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}, {Addr: servers[2].addr}}) for i := 0; i < 1000; i++ { - if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port { + if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[0].port { break } time.Sleep(time.Millisecond) } for i := 0; i < 20; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[0].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) } time.Sleep(10 * time.Millisecond) @@ -299,13 +308,13 @@ func TestAddressesRemovedPickfirst(t *testing.T) { // Remove server[0]. r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}) for i := 0; i < 1000; i++ { - if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { + if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[1].port { break } time.Sleep(time.Millisecond) } for i := 0; i < 20; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[1].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) } time.Sleep(10 * time.Millisecond) @@ -314,7 +323,7 @@ func TestAddressesRemovedPickfirst(t *testing.T) { // Append server[0], nothing should change. r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}, {Addr: servers[0].addr}}) for i := 0; i < 20; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[1].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) } time.Sleep(10 * time.Millisecond) @@ -323,13 +332,13 @@ func TestAddressesRemovedPickfirst(t *testing.T) { // Remove server[1]. r.NewAddress([]resolver.Address{{Addr: servers[2].addr}, {Addr: servers[0].addr}}) for i := 0; i < 1000; i++ { - if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port { + if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[2].port { break } time.Sleep(time.Millisecond) } for i := 0; i < 20; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[2].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[2].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port) } time.Sleep(10 * time.Millisecond) @@ -338,13 +347,13 @@ func TestAddressesRemovedPickfirst(t *testing.T) { // Remove server[2]. r.NewAddress([]resolver.Address{{Addr: servers[0].addr}}) for i := 0; i < 1000; i++ { - if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port { + if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && errorDesc(err) == servers[0].port { break } time.Sleep(time.Millisecond) } for i := 0; i < 20; i++ { - if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || errorDesc(err) != servers[0].port { t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) } time.Sleep(10 * time.Millisecond) diff --git a/resolver/resolver.go b/resolver/resolver.go index 9efcffb3aa06..df097eedf79a 100644 --- a/resolver/resolver.go +++ b/resolver/resolver.go @@ -90,6 +90,9 @@ type Address struct { // BuildOption includes additional information for the builder to create // the resolver. type BuildOption struct { + // UserOptions can be used to pass configuration between DialOptions and the + // resolver. + UserOptions interface{} } // ClientConn contains the callbacks for resolver to notify any updates diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index 1a1591e8f62b..ef5d4c286926 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -83,7 +83,9 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) { } var err error - ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{}) + ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{ + UserOptions: cc.dopts.resolverBuildUserOptions, + }) if err != nil { return nil, err } diff --git a/resolver_test.go b/resolver_test.go new file mode 100644 index 000000000000..6aba13c1d76e --- /dev/null +++ b/resolver_test.go @@ -0,0 +1,99 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "fmt" + "strings" + "testing" + "time" + + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/test/leakcheck" +) + +func TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) { + defer leakcheck.Check(t) + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() + + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure()) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer cc.Close() + + // SwitchBalancer before NewAddress. There was no balancer created, this + // makes sure we don't call close on nil balancerWrapper. + r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) // This should not panic. + + time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn. +} + +func TestResolverEmptyUpdateNotPanic(t *testing.T) { + defer leakcheck.Check(t) + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() + + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure()) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer cc.Close() + + // This make sure we don't create addrConn with empty address list. + r.NewAddress([]resolver.Address{}) // This should not panic. + + time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn. +} + +var ( + errTestResolverFailBuild = fmt.Errorf("test resolver build error") +) + +type testResolverFailBuilder struct { + buildOpt resolver.BuildOption +} + +func (r *testResolverFailBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { + r.buildOpt = opts + return nil, errTestResolverFailBuild +} +func (r *testResolverFailBuilder) Scheme() string { + return "testResolverFailBuilderScheme" +} + +// Tests that options in WithResolverUserOptions are passed to resolver.Build(). +func TestResolverUserOptions(t *testing.T) { + r := &testResolverFailBuilder{} + + userOpt := "testUserOpt" + _, err := Dial("scheme:///test.server", WithInsecure(), + withResolverBuilder(r), + WithResolverUserOptions(userOpt), + ) + if err == nil || !strings.Contains(err.Error(), errTestResolverFailBuild.Error()) { + t.Fatalf("Dial with testResolverFailBuilder returns err: %v, want: %v", err, errTestResolverFailBuild) + } + + if r.buildOpt.UserOptions != userOpt { + t.Fatalf("buildOpt.UserOptions = %T %+v, want %v", r.buildOpt.UserOptions, r.buildOpt.UserOptions, userOpt) + } +} diff --git a/rpc_util.go b/rpc_util.go index 6ab195c2dc73..0fe501f05a05 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -293,10 +293,10 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt return pf, nil, nil } if int64(length) > int64(maxInt) { - return 0, nil, Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt) + return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt) } if int(length) > maxReceiveMessageSize { - return 0, nil, Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) + return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) } // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead // of making it for each message: @@ -326,7 +326,7 @@ func encode(c Codec, msg interface{}, cp Compressor, outPayload *stats.OutPayloa var err error b, err = c.Marshal(msg) if err != nil { - return nil, nil, Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error()) + return nil, nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error()) } if outPayload != nil { outPayload.Payload = msg @@ -340,20 +340,20 @@ func encode(c Codec, msg interface{}, cp Compressor, outPayload *stats.OutPayloa if compressor != nil { z, _ := compressor.Compress(cbuf) if _, err := z.Write(b); err != nil { - return nil, nil, Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error()) + return nil, nil, status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error()) } z.Close() } else { // If Compressor is not set by UseCompressor, use default Compressor if err := cp.Do(cbuf, b); err != nil { - return nil, nil, Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error()) + return nil, nil, status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error()) } } b = cbuf.Bytes() } } if uint(len(b)) > math.MaxUint32 { - return nil, nil, Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b)) + return nil, nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b)) } bufHeader := make([]byte, payloadLen+sizeLen) @@ -409,26 +409,26 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{ if dc != nil { d, err = dc.Do(bytes.NewReader(d)) if err != nil { - return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) + return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) } } else { dcReader, err := compressor.Decompress(bytes.NewReader(d)) if err != nil { - return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) + return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) } d, err = ioutil.ReadAll(dcReader) if err != nil { - return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) + return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) } } } if len(d) > maxReceiveMessageSize { // TODO: Revisit the error code. Currently keep it consistent with java // implementation. - return Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize) + return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize) } if err := c.Unmarshal(d, m); err != nil { - return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) + return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) } if inPayload != nil { inPayload.RecvTime = time.Now() @@ -458,7 +458,7 @@ func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) { // Code returns the error code for err if it was produced by the rpc system. // Otherwise, it returns codes.Unknown. // -// Deprecated; use status.FromError and Code method instead. +// Deprecated: use status.FromError and Code method instead. func Code(err error) codes.Code { if s, ok := status.FromError(err); ok { return s.Code() @@ -469,7 +469,7 @@ func Code(err error) codes.Code { // ErrorDesc returns the error description of err if it was produced by the rpc system. // Otherwise, it returns err.Error() or empty string when err is nil. // -// Deprecated; use status.FromError and Message method instead. +// Deprecated: use status.FromError and Message method instead. func ErrorDesc(err error) string { if s, ok := status.FromError(err); ok { return s.Message() @@ -480,7 +480,7 @@ func ErrorDesc(err error) string { // Errorf returns an error containing an error code and a description; // Errorf returns nil if c is OK. // -// Deprecated; use status.Errorf instead. +// Deprecated: use status.Errorf instead. func Errorf(c codes.Code, format string, a ...interface{}) error { return status.Errorf(c, format, a...) } diff --git a/server.go b/server.go index e9737fc499a2..504065f38ad4 100644 --- a/server.go +++ b/server.go @@ -826,7 +826,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. return err } if err == io.ErrUnexpectedEOF { - err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) + err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) } if err != nil { if st, ok := status.FromError(err); ok { @@ -868,13 +868,13 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if dc != nil { req, err = dc.Do(bytes.NewReader(req)) if err != nil { - return Errorf(codes.Internal, err.Error()) + return status.Errorf(codes.Internal, err.Error()) } } else { tmp, _ := decomp.Decompress(bytes.NewReader(req)) req, err = ioutil.ReadAll(tmp) if err != nil { - return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) + return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) } } } @@ -1246,7 +1246,7 @@ func SetHeader(ctx context.Context, md metadata.MD) error { } stream, ok := transport.StreamFromContext(ctx) if !ok { - return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) + return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) } return stream.SetHeader(md) } @@ -1256,7 +1256,7 @@ func SetHeader(ctx context.Context, md metadata.MD) error { func SendHeader(ctx context.Context, md metadata.MD) error { stream, ok := transport.StreamFromContext(ctx) if !ok { - return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) + return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) } t := stream.ServerTransport() if t == nil { @@ -1276,7 +1276,7 @@ func SetTrailer(ctx context.Context, md metadata.MD) error { } stream, ok := transport.StreamFromContext(ctx) if !ok { - return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) + return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) } return stream.SetTrailer(md) } diff --git a/server_test.go b/server_test.go index cd2f2c083827..4a7a524404a8 100644 --- a/server_test.go +++ b/server_test.go @@ -49,7 +49,7 @@ func TestStopBeforeServe(t *testing.T) { // server.Serve is responsible for closing the listener, even if the // server was already stopped. err = lis.Close() - if got, want := ErrorDesc(err), "use of closed"; !strings.Contains(got, want) { + if got, want := errorDesc(err), "use of closed"; !strings.Contains(got, want) { t.Errorf("Close() error = %q, want %q", got, want) } } diff --git a/stats/stats_test.go b/stats/stats_test.go index 0df23f3dc131..090de4f63b51 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -30,6 +30,7 @@ import ( "github.com/golang/protobuf/proto" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/internal" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" testpb "google.golang.org/grpc/stats/grpc_testing" @@ -64,10 +65,10 @@ func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (* md, ok := metadata.FromIncomingContext(ctx) if ok { if err := grpc.SendHeader(ctx, md); err != nil { - return nil, status.Errorf(grpc.Code(err), "grpc.SendHeader(_, %v) = %v, want ", md, err) + return nil, status.Errorf(internal.Code(err), "grpc.SendHeader(_, %v) = %v, want ", md, err) } if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil { - return nil, status.Errorf(grpc.Code(err), "grpc.SetTrailer(_, %v) = %v, want ", testTrailerMetadata, err) + return nil, status.Errorf(internal.Code(err), "grpc.SetTrailer(_, %v) = %v, want ", testTrailerMetadata, err) } } @@ -82,7 +83,7 @@ func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServ md, ok := metadata.FromIncomingContext(stream.Context()) if ok { if err := stream.SendHeader(md); err != nil { - return status.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil) + return status.Errorf(internal.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil) } stream.SetTrailer(testTrailerMetadata) } @@ -110,7 +111,7 @@ func (s *testServer) ClientStreamCall(stream testpb.TestService_ClientStreamCall md, ok := metadata.FromIncomingContext(stream.Context()) if ok { if err := stream.SendHeader(md); err != nil { - return status.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil) + return status.Errorf(internal.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil) } stream.SetTrailer(testTrailerMetadata) } @@ -134,7 +135,7 @@ func (s *testServer) ServerStreamCall(in *testpb.SimpleRequest, stream testpb.Te md, ok := metadata.FromIncomingContext(stream.Context()) if ok { if err := stream.SendHeader(md); err != nil { - return status.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil) + return status.Errorf(internal.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil) } stream.SetTrailer(testTrailerMetadata) } @@ -646,7 +647,14 @@ func checkEnd(t *testing.T, d *gotData, e *expectedData) { if st.EndTime.IsZero() { t.Fatalf("st.EndTime = %v, want ", st.EndTime) } - if grpc.Code(st.Error) != grpc.Code(e.err) || grpc.ErrorDesc(st.Error) != grpc.ErrorDesc(e.err) { + + actual, ok := status.FromError(st.Error) + if !ok { + t.Fatalf("expected st.Error to be a statusError, got %T", st.Error) + } + + expectedStatus, _ := status.FromError(e.err) + if actual.Code() != expectedStatus.Code() || actual.Message() != expectedStatus.Message() { t.Fatalf("st.Error = %v, want %v", st.Error, e.err) } } diff --git a/stream.go b/stream.go index f2ded47d4f86..f91381995102 100644 --- a/stream.go +++ b/stream.go @@ -163,7 +163,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth if ct != encoding.Identity { comp = encoding.GetCompressor(ct) if comp == nil { - return nil, Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) + return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) } } } else if cc.dopts.cp != nil { @@ -400,10 +400,10 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { return err } if cs.c.maxSendMessageSize == nil { - return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") + return status.Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") } if len(data) > *cs.c.maxSendMessageSize { - return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), *cs.c.maxSendMessageSize) + return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), *cs.c.maxSendMessageSize) } err = cs.t.Write(cs.s, hdr, data, &transport.Options{Last: false}) if err == nil && outPayload != nil { @@ -421,7 +421,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { } } if cs.c.maxReceiveMessageSize == nil { - return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") + return status.Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") } if !cs.decompSet { // Block until we receive headers containing received message encoding. @@ -463,7 +463,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { // Special handling for client streaming rpc. // This recv expects EOF or errors, so we don't collect inPayload. if cs.c.maxReceiveMessageSize == nil { - return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") + return status.Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") } err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil, cs.decomp) cs.closeTransportStream(err) @@ -660,7 +660,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { return err } if len(data) > ss.maxSendMessageSize { - return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), ss.maxSendMessageSize) + return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), ss.maxSendMessageSize) } if err := ss.t.Write(ss.s, hdr, data, &transport.Options{Last: false}); err != nil { return toRPCErr(err) @@ -700,7 +700,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { return err } if err == io.ErrUnexpectedEOF { - err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) + err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) } return toRPCErr(err) } diff --git a/test/end2end_test.go b/test/end2end_test.go index 43a53a19ba82..cb5cd1ccb381 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -159,29 +159,29 @@ func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (* } if s.setAndSendHeader { if err := grpc.SetHeader(ctx, md); err != nil { - return nil, status.Errorf(grpc.Code(err), "grpc.SetHeader(_, %v) = %v, want ", md, err) + return nil, status.Errorf(internal.Code(err), "grpc.SetHeader(_, %v) = %v, want ", md, err) } if err := grpc.SendHeader(ctx, testMetadata2); err != nil { - return nil, status.Errorf(grpc.Code(err), "grpc.SendHeader(_, %v) = %v, want ", testMetadata2, err) + return nil, status.Errorf(internal.Code(err), "grpc.SendHeader(_, %v) = %v, want ", testMetadata2, err) } } else if s.setHeaderOnly { if err := grpc.SetHeader(ctx, md); err != nil { - return nil, status.Errorf(grpc.Code(err), "grpc.SetHeader(_, %v) = %v, want ", md, err) + return nil, status.Errorf(internal.Code(err), "grpc.SetHeader(_, %v) = %v, want ", md, err) } if err := grpc.SetHeader(ctx, testMetadata2); err != nil { - return nil, status.Errorf(grpc.Code(err), "grpc.SetHeader(_, %v) = %v, want ", testMetadata2, err) + return nil, status.Errorf(internal.Code(err), "grpc.SetHeader(_, %v) = %v, want ", testMetadata2, err) } } else { if err := grpc.SendHeader(ctx, md); err != nil { - return nil, status.Errorf(grpc.Code(err), "grpc.SendHeader(_, %v) = %v, want ", md, err) + return nil, status.Errorf(internal.Code(err), "grpc.SendHeader(_, %v) = %v, want ", md, err) } } if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil { - return nil, status.Errorf(grpc.Code(err), "grpc.SetTrailer(_, %v) = %v, want ", testTrailerMetadata, err) + return nil, status.Errorf(internal.Code(err), "grpc.SetTrailer(_, %v) = %v, want ", testTrailerMetadata, err) } if s.multipleSetTrailer { if err := grpc.SetTrailer(ctx, testTrailerMetadata2); err != nil { - return nil, status.Errorf(grpc.Code(err), "grpc.SetTrailer(_, %v) = %v, want ", testTrailerMetadata2, err) + return nil, status.Errorf(internal.Code(err), "grpc.SetTrailer(_, %v) = %v, want ", testTrailerMetadata2, err) } } } @@ -278,21 +278,21 @@ func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServ if ok { if s.setAndSendHeader { if err := stream.SetHeader(md); err != nil { - return status.Errorf(grpc.Code(err), "%v.SetHeader(_, %v) = %v, want ", stream, md, err) + return status.Errorf(internal.Code(err), "%v.SetHeader(_, %v) = %v, want ", stream, md, err) } if err := stream.SendHeader(testMetadata2); err != nil { - return status.Errorf(grpc.Code(err), "%v.SendHeader(_, %v) = %v, want ", stream, testMetadata2, err) + return status.Errorf(internal.Code(err), "%v.SendHeader(_, %v) = %v, want ", stream, testMetadata2, err) } } else if s.setHeaderOnly { if err := stream.SetHeader(md); err != nil { - return status.Errorf(grpc.Code(err), "%v.SetHeader(_, %v) = %v, want ", stream, md, err) + return status.Errorf(internal.Code(err), "%v.SetHeader(_, %v) = %v, want ", stream, md, err) } if err := stream.SetHeader(testMetadata2); err != nil { - return status.Errorf(grpc.Code(err), "%v.SetHeader(_, %v) = %v, want ", stream, testMetadata2, err) + return status.Errorf(internal.Code(err), "%v.SetHeader(_, %v) = %v, want ", stream, testMetadata2, err) } } else { if err := stream.SendHeader(md); err != nil { - return status.Errorf(grpc.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil) + return status.Errorf(internal.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil) } } stream.SetTrailer(testTrailerMetadata) @@ -308,7 +308,7 @@ func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServ } if err != nil { // to facilitate testSvrWriteStatusEarlyWrite - if grpc.Code(err) == codes.ResourceExhausted { + if internal.Code(err) == codes.ResourceExhausted { return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error()) } return err @@ -328,7 +328,7 @@ func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServ Payload: payload, }); err != nil { // to facilitate testSvrWriteStatusEarlyWrite - if grpc.Code(err) == codes.ResourceExhausted { + if internal.Code(err) == codes.ResourceExhausted { return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error()) } return err @@ -786,7 +786,7 @@ func TestContextDeadlineNotIgnored(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() t1 := time.Now() - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, context.DeadlineExceeded", err) } if time.Since(t1) > 2*time.Second { @@ -831,7 +831,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond) _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)) cancel() - if e.balancer != "" && grpc.Code(err) != codes.DeadlineExceeded { + if e.balancer != "" && internal.Code(err) != codes.DeadlineExceeded { // If e.balancer == nil, the ac will stop reconnecting because the dialer returns non-temp error, // the error will be an internal error. t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %s", ctx, err, codes.DeadlineExceeded) @@ -890,14 +890,14 @@ func testServerGoAway(t *testing.T, e env) { // Loop until the server side GoAway signal is propagated to the client. for { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && internal.Code(err) != codes.DeadlineExceeded { cancel() break } cancel() } // A new RPC should fail. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable && grpc.Code(err) != codes.Internal { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); internal.Code(err) != codes.Unavailable && internal.Code(err) != codes.Internal { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal) } <-ch @@ -1242,17 +1242,17 @@ func testFailFast(t *testing.T, e env) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) _, err := tc.EmptyCall(ctx, &testpb.Empty{}) cancel() - if grpc.Code(err) == codes.Unavailable { + if internal.Code(err) == codes.Unavailable { break } t.Logf("%v.EmptyCall(_, _) = _, %v", tc, err) time.Sleep(10 * time.Millisecond) } // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); internal.Code(err) != codes.Unavailable { t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %s", err, codes.Unavailable) } - if _, err := tc.StreamingInputCall(context.Background()); grpc.Code(err) != codes.Unavailable { + if _, err := tc.StreamingInputCall(context.Background()); internal.Code(err) != codes.Unavailable { t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %s", err, codes.Unavailable) } @@ -1329,7 +1329,7 @@ func TestGetMethodConfig(t *testing.T) { // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. var err error - if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { + if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } @@ -1364,7 +1364,7 @@ func TestGetMethodConfig(t *testing.T) { time.Sleep(time.Millisecond) } // The following RPCs are expected to become fail-fast. - if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable { + if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); internal.Code(err) != codes.Unavailable { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) } } @@ -1410,10 +1410,10 @@ func TestServiceConfigWaitForReady(t *testing.T) { // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. var err error - if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } - if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } @@ -1446,10 +1446,10 @@ func TestServiceConfigWaitForReady(t *testing.T) { time.Sleep(time.Millisecond) } // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } - if _, err := tc.FullDuplexCall(context.Background()); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.FullDuplexCall(context.Background()); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } } @@ -1496,13 +1496,13 @@ func TestServiceConfigTimeout(t *testing.T) { // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. var err error ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) - if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } cancel() ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) - if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } cancel() @@ -1537,13 +1537,13 @@ func TestServiceConfigTimeout(t *testing.T) { } ctx, cancel = context.WithTimeout(context.Background(), time.Hour) - if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } cancel() ctx, cancel = context.WithTimeout(context.Background(), time.Hour) - if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } cancel() @@ -1618,14 +1618,14 @@ func TestServiceConfigMaxMsgSize(t *testing.T) { } // Test for unary RPC recv. - if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test for unary RPC send. req.Payload = extraLargePayload req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -1647,7 +1647,7 @@ func TestServiceConfigMaxMsgSize(t *testing.T) { if err = stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err = stream.Recv(); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } @@ -1658,7 +1658,7 @@ func TestServiceConfigMaxMsgSize(t *testing.T) { if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } - if err = stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if err = stream.Send(sreq); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) } @@ -1687,14 +1687,14 @@ func TestServiceConfigMaxMsgSize(t *testing.T) { req.Payload = smallPayload req.ResponseSize = int32(largeSize) - if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test for unary RPC send. req.Payload = largePayload req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -1708,7 +1708,7 @@ func TestServiceConfigMaxMsgSize(t *testing.T) { if err = stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err = stream.Recv(); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } @@ -1719,7 +1719,7 @@ func TestServiceConfigMaxMsgSize(t *testing.T) { if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } - if err = stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if err = stream.Send(sreq); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) } @@ -1754,7 +1754,7 @@ func TestServiceConfigMaxMsgSize(t *testing.T) { } req.ResponseSize = int32(extraLargeSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -1766,7 +1766,7 @@ func TestServiceConfigMaxMsgSize(t *testing.T) { } req.Payload = extraLargePayload - if _, err = tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err = tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -1790,7 +1790,7 @@ func TestServiceConfigMaxMsgSize(t *testing.T) { if err = stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err = stream.Recv(); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } @@ -1805,7 +1805,7 @@ func TestServiceConfigMaxMsgSize(t *testing.T) { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } sreq.Payload = extraLargePayload - if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if err := stream.Send(sreq); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) } } @@ -1843,7 +1843,7 @@ func testMaxMsgSizeClientDefault(t *testing.T, e env) { Payload: smallPayload, } // Test for unary RPC recv. - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -1866,7 +1866,7 @@ func testMaxMsgSizeClientDefault(t *testing.T, e env) { if err := stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := stream.Recv(); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } } @@ -1913,14 +1913,14 @@ func testMaxMsgSizeClientAPI(t *testing.T, e env) { Payload: smallPayload, } // Test for unary RPC recv. - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test for unary RPC send. req.Payload = largePayload req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -1943,7 +1943,7 @@ func testMaxMsgSizeClientAPI(t *testing.T, e env) { if err := stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := stream.Recv(); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } @@ -1954,7 +1954,7 @@ func testMaxMsgSizeClientAPI(t *testing.T, e env) { if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } - if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if err := stream.Send(sreq); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) } } @@ -1999,14 +1999,14 @@ func testMaxMsgSizeServerAPI(t *testing.T, e env) { Payload: smallPayload, } // Test for unary RPC send. - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test for unary RPC recv. req.Payload = largePayload req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -2029,7 +2029,7 @@ func testMaxMsgSizeServerAPI(t *testing.T, e env) { if err := stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := stream.Recv(); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } @@ -2043,7 +2043,7 @@ func testMaxMsgSizeServerAPI(t *testing.T, e env) { if err := stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := stream.Recv(); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } } @@ -2105,7 +2105,7 @@ func testTap(t *testing.T, e env) { ResponseSize: 45, Payload: payload, } - if _, err := tc.UnaryCall(context.Background(), req); grpc.Code(err) != codes.Unavailable { + if _, err := tc.UnaryCall(context.Background(), req); internal.Code(err) != codes.Unavailable { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) } } @@ -2403,13 +2403,13 @@ func testExceedMsgLimit(t *testing.T, e env) { ResponseSize: smallSize, Payload: payload, } - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test on client side for unary RPC. req.ResponseSize = int32(*te.maxMsgSize) + 1 req.Payload = smallPayload - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -2437,7 +2437,7 @@ func testExceedMsgLimit(t *testing.T, e env) { if err := stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := stream.Recv(); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } @@ -2451,7 +2451,7 @@ func testExceedMsgLimit(t *testing.T, e env) { if err := stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := stream.Recv(); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } @@ -2549,7 +2549,7 @@ func testPeerFailedRPC(t *testing.T, e env) { } peer := new(peer.Peer) - if _, err := tc.UnaryCall(context.Background(), req, grpc.Peer(peer)); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req, grpc.Peer(peer)); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } else { pa := peer.Addr.String() @@ -3018,7 +3018,7 @@ func testMalformedHTTP2Metadata(t *testing.T, e env) { Payload: payload, } ctx := metadata.NewOutgoingContext(context.Background(), malformedHTTP2Metadata) - if _, err := tc.UnaryCall(ctx, req); grpc.Code(err) != codes.Internal { + if _, err := tc.UnaryCall(ctx, req); internal.Code(err) != codes.Internal { t.Fatalf("TestService.UnaryCall(%v, _) = _, %v; want _, %s", ctx, err, codes.Internal) } } @@ -3078,7 +3078,7 @@ func testRetry(t *testing.T, e env) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) _, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(tc.failFast)) cancel() - if grpc.Code(err) != tc.errCode { + if internal.Code(err) != tc.errCode { t.Errorf("%+v: tsc.EmptyCall(_, _) = _, %v, want _, Code=%v", tc, err, tc.errCode) } } @@ -3115,7 +3115,7 @@ func testRPCTimeout(t *testing.T, e env) { } for i := -1; i <= 10; i++ { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond) - if _, err := tc.UnaryCall(ctx, req); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.UnaryCall(ctx, req); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want , error code: %s", err, codes.DeadlineExceeded) } cancel() @@ -3153,7 +3153,7 @@ func testCancel(t *testing.T, e env) { } ctx, cancel := context.WithCancel(context.Background()) time.AfterFunc(1*time.Millisecond, cancel) - if r, err := tc.UnaryCall(ctx, req); grpc.Code(err) != codes.Canceled { + if r, err := tc.UnaryCall(ctx, req); internal.Code(err) != codes.Canceled { t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %s", r, err, codes.Canceled) } awaitNewConnLogOutput() @@ -3198,7 +3198,7 @@ func testCancelNoIO(t *testing.T, e env) { if err == nil { continue } - if grpc.Code(err) == codes.DeadlineExceeded { + if internal.Code(err) == codes.DeadlineExceeded { break } t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) @@ -3247,7 +3247,7 @@ func testNoService(t *testing.T, e env) { if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } - if _, err := stream.Recv(); grpc.Code(err) != codes.Unimplemented { + if _, err := stream.Recv(); internal.Code(err) != codes.Unimplemented { t.Fatalf("stream.Recv() = _, %v, want _, error code %s", err, codes.Unimplemented) } } @@ -3662,7 +3662,7 @@ func testClientStreamingError(t *testing.T, e env) { if err := stream.Send(req); err != io.EOF { continue } - if _, err := stream.CloseAndRecv(); grpc.Code(err) != codes.NotFound { + if _, err := stream.CloseAndRecv(); internal.Code(err) != codes.NotFound { t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.NotFound) } break @@ -3703,7 +3703,7 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { time.Sleep(50 * time.Millisecond) continue } - if grpc.Code(err) == codes.DeadlineExceeded { + if internal.Code(err) == codes.DeadlineExceeded { break } t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) @@ -3742,7 +3742,7 @@ func testStreamsQuotaRecovery(t *testing.T, e env) { time.Sleep(50 * time.Millisecond) continue } - if grpc.Code(err) == codes.DeadlineExceeded { + if internal.Code(err) == codes.DeadlineExceeded { break } t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) @@ -3766,7 +3766,7 @@ func testStreamsQuotaRecovery(t *testing.T, e env) { // No rpc should go through due to the max streams limit. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() - if _, err := tc.UnaryCall(ctx, req, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.UnaryCall(ctx, req, grpc.FailFast(false)); internal.Code(err) != codes.DeadlineExceeded { t.Errorf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } }() @@ -3801,7 +3801,7 @@ func testCompressServerHasNoSupport(t *testing.T, e env) { ResponseSize: respSize, Payload: payload, } - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.Unimplemented { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.Unimplemented { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code %s", err, codes.Unimplemented) } // Streaming RPC @@ -3826,7 +3826,7 @@ func testCompressServerHasNoSupport(t *testing.T, e env) { if err := stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.Unimplemented { + if _, err := stream.Recv(); err == nil || internal.Code(err) != codes.Unimplemented { t.Fatalf("%v.Recv() = %v, want error code %s", stream, err, codes.Unimplemented) } } @@ -3973,7 +3973,7 @@ func testUnaryClientInterceptor(t *testing.T, e env) { defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.NotFound { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); internal.Code(err) != codes.NotFound { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.NotFound) } } @@ -4014,7 +4014,7 @@ func testStreamClientInterceptor(t *testing.T, e env) { ResponseParameters: respParam, Payload: payload, } - if _, err := tc.StreamingOutputCall(context.Background(), req); grpc.Code(err) != codes.NotFound { + if _, err := tc.StreamingOutputCall(context.Background(), req); internal.Code(err) != codes.NotFound { t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, error code %s", tc, err, codes.NotFound) } } @@ -4037,7 +4037,7 @@ func testUnaryServerInterceptor(t *testing.T, e env) { defer te.tearDown() tc := testpb.NewTestServiceClient(te.clientConn()) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.PermissionDenied { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); internal.Code(err) != codes.PermissionDenied { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied) } } @@ -4086,7 +4086,7 @@ func testStreamServerInterceptor(t *testing.T, e env) { if err != nil { t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, ", tc, err) } - if _, err := s1.Recv(); grpc.Code(err) != codes.PermissionDenied { + if _, err := s1.Recv(); internal.Code(err) != codes.PermissionDenied { t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied) } s2, err := tc.FullDuplexCall(context.Background()) @@ -4438,7 +4438,7 @@ func TestFailfastRPCFailOnFatalHandshakeError(t *testing.T) { // This unary call should fail, but not timeout. ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(true)); grpc.Code(err) != codes.Unavailable { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(true)); internal.Code(err) != codes.Unavailable { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want ", err) } } @@ -4503,7 +4503,7 @@ func TestFlowControlLogicalRace(t *testing.T) { if err == io.EOF { break loop } - switch grpc.Code(err) { + switch internal.Code(err) { case codes.DeadlineExceeded: break loop default: @@ -5251,7 +5251,7 @@ func testSvrWriteStatusEarlyWrite(t *testing.T, e env) { if err = stream.Send(sreq); err != nil { t.Fatalf("%v.Send() = _, %v, want ", stream, err) } - if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err = stream.Recv(); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } // Test send case: server sends a message larger than maxServerSendMsgSize. @@ -5265,7 +5265,7 @@ func testSvrWriteStatusEarlyWrite(t *testing.T, e env) { if err = stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err = stream.Recv(); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } } @@ -5314,7 +5314,7 @@ func testGetMethodConfigTD(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } @@ -5327,13 +5327,13 @@ func testGetMethodConfigTD(t *testing.T, e env) { ch <- sc // Wait for the new service config to propagate. for { - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) == codes.DeadlineExceeded { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); internal.Code(err) == codes.DeadlineExceeded { continue } break } // The following RPCs are expected to become fail-fast. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); internal.Code(err) != codes.Unavailable { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) } } @@ -5365,10 +5365,10 @@ func testServiceConfigWaitForReadyTD(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } - if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } @@ -5394,10 +5394,10 @@ func testServiceConfigWaitForReadyTD(t *testing.T, e env) { break } // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } - if _, err := tc.FullDuplexCall(context.Background()); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.FullDuplexCall(context.Background()); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } } @@ -5429,12 +5429,12 @@ func testServiceConfigTimeoutTD(t *testing.T, e env) { tc := testpb.NewTestServiceClient(cc) // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } cancel() ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) - if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } cancel() @@ -5462,13 +5462,13 @@ func testServiceConfigTimeoutTD(t *testing.T, e env) { } ctx, cancel = context.WithTimeout(context.Background(), time.Hour) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } cancel() ctx, cancel = context.WithTimeout(context.Background(), time.Hour) - if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); internal.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } cancel() @@ -5525,14 +5525,14 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { Payload: smallPayload, } // Test for unary RPC recv. - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test for unary RPC send. req.Payload = extraLargePayload req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -5554,7 +5554,7 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { if err := stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := stream.Recv(); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } @@ -5565,7 +5565,7 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } - if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if err := stream.Send(sreq); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) } @@ -5582,14 +5582,14 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { req.Payload = smallPayload req.ResponseSize = int32(largeSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } // Test for unary RPC send. req.Payload = largePayload req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -5603,7 +5603,7 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { if err := stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := stream.Recv(); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } @@ -5614,7 +5614,7 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } - if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if err := stream.Send(sreq); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) } @@ -5636,7 +5636,7 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { } req.ResponseSize = int32(extraLargeSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -5648,7 +5648,7 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { } req.Payload = extraLargePayload - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := tc.UnaryCall(context.Background(), req); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -5672,7 +5672,7 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { if err := stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err := stream.Recv(); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } @@ -5687,7 +5687,7 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } sreq.Payload = extraLargePayload - if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if err := stream.Send(sreq); err == nil || internal.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) } } diff --git a/vet.sh b/vet.sh index 8c5c5ba6dfb2..96fe4f7b7b96 100755 --- a/vet.sh +++ b/vet.sh @@ -23,8 +23,7 @@ if [ "$1" = "-install" ]; then golang.org/x/tools/cmd/goimports \ honnef.co/go/tools/cmd/staticcheck \ github.com/client9/misspell/cmd/misspell \ - github.com/golang/protobuf/protoc-gen-go \ - golang.org/x/tools/cmd/stringer + github.com/golang/protobuf/protoc-gen-go if [[ "$check_proto" = "true" ]]; then if [[ "$TRAVIS" = "true" ]]; then PROTOBUF_VERSION=3.3.0