@@ -20,7 +20,9 @@ package test
20
20
21
21
import (
22
22
"context"
23
+ "errors"
23
24
"fmt"
25
+ "strings"
24
26
"testing"
25
27
"time"
26
28
@@ -788,3 +790,219 @@ func (s) TestPickFirst_AddressUpdateWithBalancerAttributes(t *testing.T) {
788
790
t .Fatal (err )
789
791
}
790
792
}
793
+
794
+ // Tests the case where the pick_first LB policy receives an error from the name
795
+ // resolver without previously receiving a good update. Verifies that the
796
+ // channel moves to TRANSIENT_FAILURE and that error received from the name
797
+ // resolver is propagated to the caller of an RPC.
798
+ func (s ) TestPickFirst_ResolverError_NoPreviousUpdate (t * testing.T ) {
799
+ cc , r , _ := setupPickFirst (t , 0 )
800
+
801
+ nrErr := errors .New ("error from name resolver" )
802
+ r .ReportError (nrErr )
803
+
804
+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
805
+ defer cancel ()
806
+ awaitState (ctx , t , cc , connectivity .TransientFailure )
807
+
808
+ client := testgrpc .NewTestServiceClient (cc )
809
+ _ , err := client .EmptyCall (ctx , & testpb.Empty {})
810
+ if err == nil {
811
+ t .Fatalf ("EmptyCall() succeeded when expected to fail with error: %v" , nrErr )
812
+ }
813
+ if ! strings .Contains (err .Error (), nrErr .Error ()) {
814
+ t .Fatalf ("EmptyCall() failed with error: %v, want error: %v" , err , nrErr )
815
+ }
816
+ }
817
+
818
+ // Tests the case where the pick_first LB policy receives an error from the name
819
+ // resolver after receiving a good update (and the channel is currently READY).
820
+ // The test verifies that the channel continues to use the previously received
821
+ // good update.
822
+ func (s ) TestPickFirst_ResolverError_WithPreviousUpdate_Ready (t * testing.T ) {
823
+ cc , r , backends := setupPickFirst (t , 1 )
824
+
825
+ addrs := stubBackendsToResolverAddrs (backends )
826
+ r .UpdateState (resolver.State {Addresses : addrs })
827
+
828
+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
829
+ defer cancel ()
830
+ if err := pickfirst .CheckRPCsToBackend (ctx , cc , addrs [0 ]); err != nil {
831
+ t .Fatal (err )
832
+ }
833
+
834
+ nrErr := errors .New ("error from name resolver" )
835
+ r .ReportError (nrErr )
836
+
837
+ // Ensure that RPCs continue to succeed for the next second.
838
+ client := testgrpc .NewTestServiceClient (cc )
839
+ for end := time .Now ().Add (time .Second ); time .Now ().Before (end ); <- time .After (defaultTestShortTimeout ) {
840
+ if _ , err := client .EmptyCall (ctx , & testpb.Empty {}); err != nil {
841
+ t .Fatalf ("EmptyCall() failed: %v" , err )
842
+ }
843
+ }
844
+ }
845
+
846
+ // Tests the case where the pick_first LB policy receives an error from the name
847
+ // resolver after receiving a good update (and the channel is currently in
848
+ // CONNECTING state). The test verifies that the channel continues to use the
849
+ // previously received good update, and that RPCs don't fail with the error
850
+ // received from the name resolver.
851
+ func (s ) TestPickFirst_ResolverError_WithPreviousUpdate_Connecting (t * testing.T ) {
852
+ lis , err := testutils .LocalTCPListener ()
853
+ if err != nil {
854
+ t .Fatalf ("net.Listen() failed: %v" , err )
855
+ }
856
+
857
+ // Listen on a local port and act like a server that blocks until the
858
+ // channel reaches CONNECTING and closes the connection without sending a
859
+ // server preface.
860
+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
861
+ defer cancel ()
862
+ waitForConnecting := make (chan struct {})
863
+ go func () {
864
+ conn , err := lis .Accept ()
865
+ if err != nil {
866
+ t .Errorf ("Unexpected error when accepting a connection: %v" , err )
867
+ }
868
+ defer conn .Close ()
869
+
870
+ select {
871
+ case <- waitForConnecting :
872
+ case <- ctx .Done ():
873
+ t .Error ("Timeout when waiting for channel to move to CONNECTING state" )
874
+ }
875
+ }()
876
+
877
+ r := manual .NewBuilderWithScheme ("whatever" )
878
+ dopts := []grpc.DialOption {
879
+ grpc .WithTransportCredentials (insecure .NewCredentials ()),
880
+ grpc .WithResolvers (r ),
881
+ grpc .WithDefaultServiceConfig (pickFirstServiceConfig ),
882
+ }
883
+ cc , err := grpc .Dial (r .Scheme ()+ ":///test.server" , dopts ... )
884
+ if err != nil {
885
+ t .Fatalf ("grpc.Dial() failed: %v" , err )
886
+ }
887
+ t .Cleanup (func () { cc .Close () })
888
+
889
+ addrs := []resolver.Address {{Addr : lis .Addr ().String ()}}
890
+ r .UpdateState (resolver.State {Addresses : addrs })
891
+ awaitState (ctx , t , cc , connectivity .Connecting )
892
+
893
+ nrErr := errors .New ("error from name resolver" )
894
+ r .ReportError (nrErr )
895
+
896
+ // RPCs should fail with deadline exceed error as long as they are in
897
+ // CONNECTING and not the error returned by the name resolver.
898
+ client := testgrpc .NewTestServiceClient (cc )
899
+ sCtx , sCancel := context .WithTimeout (ctx , defaultTestShortTimeout )
900
+ defer sCancel ()
901
+ if _ , err := client .EmptyCall (sCtx , & testpb.Empty {}); ! strings .Contains (err .Error (), context .DeadlineExceeded .Error ()) {
902
+ t .Fatalf ("EmptyCall() failed with error: %v, want error: %v" , err , context .DeadlineExceeded )
903
+ }
904
+
905
+ // Closing this channel leads to closing of the connection by our listener.
906
+ // gRPC should see this as a connection error.
907
+ close (waitForConnecting )
908
+ awaitState (ctx , t , cc , connectivity .TransientFailure )
909
+ checkForConnectionError (ctx , t , cc )
910
+ }
911
+
912
+ // Tests the case where the pick_first LB policy receives an error from the name
913
+ // resolver after receiving a good update. The previous good update though has
914
+ // seen the channel move to TRANSIENT_FAILURE. The test verifies that the
915
+ // channel fails RPCs with the new error from the resolver.
916
+ func (s ) TestPickFirst_ResolverError_WithPreviousUpdate_TransientFailure (t * testing.T ) {
917
+ lis , err := testutils .LocalTCPListener ()
918
+ if err != nil {
919
+ t .Fatalf ("net.Listen() failed: %v" , err )
920
+ }
921
+
922
+ // Listen on a local port and act like a server that closes the connection
923
+ // without sending a server preface.
924
+ go func () {
925
+ conn , err := lis .Accept ()
926
+ if err != nil {
927
+ t .Errorf ("Unexpected error when accepting a connection: %v" , err )
928
+ }
929
+ conn .Close ()
930
+ }()
931
+
932
+ r := manual .NewBuilderWithScheme ("whatever" )
933
+ dopts := []grpc.DialOption {
934
+ grpc .WithTransportCredentials (insecure .NewCredentials ()),
935
+ grpc .WithResolvers (r ),
936
+ grpc .WithDefaultServiceConfig (pickFirstServiceConfig ),
937
+ }
938
+ cc , err := grpc .Dial (r .Scheme ()+ ":///test.server" , dopts ... )
939
+ if err != nil {
940
+ t .Fatalf ("grpc.Dial() failed: %v" , err )
941
+ }
942
+ t .Cleanup (func () { cc .Close () })
943
+
944
+ addrs := []resolver.Address {{Addr : lis .Addr ().String ()}}
945
+ r .UpdateState (resolver.State {Addresses : addrs })
946
+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
947
+ defer cancel ()
948
+ awaitState (ctx , t , cc , connectivity .TransientFailure )
949
+ checkForConnectionError (ctx , t , cc )
950
+
951
+ // An error from the name resolver should result in RPCs failing with that
952
+ // error instead of the old error that caused the channel to move to
953
+ // TRANSIENT_FAILURE in the first place.
954
+ nrErr := errors .New ("error from name resolver" )
955
+ r .ReportError (nrErr )
956
+ client := testgrpc .NewTestServiceClient (cc )
957
+ for ; ctx .Err () == nil ; <- time .After (defaultTestShortTimeout ) {
958
+ if _ , err := client .EmptyCall (ctx , & testpb.Empty {}); strings .Contains (err .Error (), nrErr .Error ()) {
959
+ break
960
+ }
961
+ }
962
+ if ctx .Err () != nil {
963
+ t .Fatal ("Timeout when waiting for RPCs to fail with error returned by the name resolver" )
964
+ }
965
+ }
966
+
967
+ func checkForConnectionError (ctx context.Context , t * testing.T , cc * grpc.ClientConn ) {
968
+ t .Helper ()
969
+
970
+ // RPCs may fail on the client side in two ways, once the fake server closes
971
+ // the accepted connection:
972
+ // - writing the client preface succeeds, but not reading the server preface
973
+ // - writing the client preface fails
974
+ // In either case, we should see it fail with UNAVAILABLE.
975
+ client := testgrpc .NewTestServiceClient (cc )
976
+ if _ , err := client .EmptyCall (ctx , & testpb.Empty {}); status .Code (err ) != codes .Unavailable {
977
+ t .Fatalf ("EmptyCall() failed with error: %v, want code %v" , err , codes .Unavailable )
978
+ }
979
+ }
980
+
981
+ // Tests the case where the pick_first LB policy receives an update from the
982
+ // name resolver with no addresses after receiving a good update. The test
983
+ // verifies that the channel fails RPCs with an error indicating the fact that
984
+ // the name resolver returned no addresses.
985
+ func (s ) TestPickFirst_ResolverError_ZeroAddresses_WithPreviousUpdate (t * testing.T ) {
986
+ cc , r , backends := setupPickFirst (t , 1 )
987
+
988
+ addrs := stubBackendsToResolverAddrs (backends )
989
+ r .UpdateState (resolver.State {Addresses : addrs })
990
+
991
+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
992
+ defer cancel ()
993
+ if err := pickfirst .CheckRPCsToBackend (ctx , cc , addrs [0 ]); err != nil {
994
+ t .Fatal (err )
995
+ }
996
+
997
+ r .UpdateState (resolver.State {})
998
+ wantErr := "produced zero addresses"
999
+ client := testgrpc .NewTestServiceClient (cc )
1000
+ for ; ctx .Err () == nil ; <- time .After (defaultTestShortTimeout ) {
1001
+ if _ , err := client .EmptyCall (ctx , & testpb.Empty {}); strings .Contains (err .Error (), wantErr ) {
1002
+ break
1003
+ }
1004
+ }
1005
+ if ctx .Err () != nil {
1006
+ t .Fatal ("Timeout when waiting for RPCs to fail with error returned by the name resolver" )
1007
+ }
1008
+ }
0 commit comments