-
Notifications
You must be signed in to change notification settings - Fork 4.5k
clusterresolver: handle EDS nacks and resource-not-found errors correctly #6436
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,13 +27,21 @@ import ( | |
"github.com/google/go-cmp/cmp" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/credentials/insecure" | ||
"google.golang.org/grpc/internal" | ||
"google.golang.org/grpc/internal/stubserver" | ||
"google.golang.org/grpc/internal/testutils" | ||
"google.golang.org/grpc/internal/testutils/xds/e2e" | ||
"google.golang.org/grpc/peer" | ||
"google.golang.org/grpc/resolver" | ||
"google.golang.org/grpc/resolver/manual" | ||
"google.golang.org/grpc/serviceconfig" | ||
"google.golang.org/grpc/status" | ||
xdstestutils "google.golang.org/grpc/xds/internal/testutils" | ||
"google.golang.org/grpc/xds/internal/xdsclient" | ||
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap" | ||
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" | ||
"google.golang.org/protobuf/types/known/wrapperspb" | ||
|
||
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" | ||
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" | ||
|
@@ -771,3 +779,289 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) { | |
t.Fatalf("EmptyCall() failed with error %v, want %v", err, dnsErr) | ||
} | ||
} | ||
|
||
// TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate tests the | ||
// scenario where the top-level cluster is an aggregate cluster that resolves to | ||
// an EDS and LOGICAL_DNS cluster. The management server first sends a good EDS | ||
// response for the EDS cluster and the test verifies that RPCs get routed to | ||
// the EDS cluster. The management server then sends a bad EDS response. The | ||
// test verifies that the cluster_resolver LB policy continues to use the | ||
// previously received good update and that RPCs still get routed to the EDS | ||
// cluster. | ||
func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *testing.T) { | ||
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() | ||
defer cleanup1() | ||
|
||
// Start an xDS management server. | ||
mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) | ||
defer cleanup2() | ||
|
||
// Start two test backends and extract their host and port. The first | ||
// backend is used for the EDS cluster and the second backend is used for | ||
// the LOGICAL_DNS cluster. | ||
servers, cleanup3 := startTestServiceBackends(t, 2) | ||
defer cleanup3() | ||
addrs, ports := backendAddressesAndPorts(t, servers) | ||
|
||
// Configure an aggregate cluster pointing to an EDS and DNS cluster. Also | ||
// configure an endpoints resource for the EDS cluster. | ||
const ( | ||
edsClusterName = clusterName + "-eds" | ||
dnsClusterName = clusterName + "-dns" | ||
dnsHostName = "dns_host" | ||
dnsPort = uint32(8080) | ||
) | ||
resources := e2e.UpdateOptions{ | ||
NodeID: nodeID, | ||
Clusters: []*v3clusterpb.Cluster{ | ||
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), | ||
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone), | ||
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), | ||
}, | ||
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})}, | ||
SkipValidation: true, | ||
} | ||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
if err := mgmtServer.Update(ctx, resources); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
// Create xDS client, configure cds_experimental LB policy with a manual | ||
// resolver, and dial the test backends. | ||
cc, cleanup := setupAndDial(t, bootstrapContents) | ||
defer cleanup() | ||
|
||
// Ensure that the DNS resolver is started for the expected target. | ||
select { | ||
case <-ctx.Done(): | ||
t.Fatal("Timeout when waiting for DNS resolver to be started") | ||
case target := <-dnsTargetCh: | ||
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) | ||
if got != want { | ||
t.Fatalf("DNS resolution started for target %q, want %q", got, want) | ||
} | ||
} | ||
|
||
// Update DNS resolver with test backend addresses. | ||
dnsR.UpdateState(resolver.State{Addresses: addrs[1:]}) | ||
|
||
// Make an RPC and ensure that it gets routed to the first backend since the | ||
// EDS cluster is of higher priority than the LOGICAL_DNS cluster. | ||
client := testgrpc.NewTestServiceClient(cc) | ||
peer := &peer.Peer{} | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { | ||
t.Fatalf("EmptyCall() failed: %v", err) | ||
} | ||
if peer.Addr.String() != addrs[0].Addr { | ||
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr) | ||
} | ||
|
||
// Push an EDS resource from the management server that is expected to be | ||
// NACKed by the xDS client. Since the cluster_resolver LB policy has a | ||
// previously received good EDS resource, it will continue to use that. | ||
resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0} | ||
if err := mgmtServer.Update(ctx, resources); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
// Ensure that RPCs continue to get routed to the EDS cluster for the next | ||
// second. | ||
for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) { | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil { | ||
t.Fatalf("EmptyCall() failed: %v", err) | ||
} | ||
if peer.Addr.String() != addrs[0].Addr { | ||
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr) | ||
} | ||
} | ||
} | ||
|
||
// TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate tests the | ||
// scenario where the top-level cluster is an aggregate cluster that resolves to | ||
// an EDS and LOGICAL_DNS cluster. The management server sends a bad EDS | ||
// response. The test verifies that the cluster_resolver LB policy falls back to | ||
// the LOGICAL_DNS cluster, because it is supposed to treat the bad EDS response | ||
// as though it received an update with no endpoints. | ||
func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *testing.T) { | ||
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() | ||
defer cleanup1() | ||
|
||
// Start an xDS management server. | ||
mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) | ||
defer cleanup2() | ||
|
||
// Start two test backends and extract their host and port. The first | ||
// backend is used for the EDS cluster and the second backend is used for | ||
// the LOGICAL_DNS cluster. | ||
servers, cleanup3 := startTestServiceBackends(t, 2) | ||
defer cleanup3() | ||
addrs, ports := backendAddressesAndPorts(t, servers) | ||
|
||
// Configure an aggregate cluster pointing to an EDS and DNS cluster. | ||
const ( | ||
edsClusterName = clusterName + "-eds" | ||
dnsClusterName = clusterName + "-dns" | ||
dnsHostName = "dns_host" | ||
dnsPort = uint32(8080) | ||
) | ||
resources := e2e.UpdateOptions{ | ||
NodeID: nodeID, | ||
Clusters: []*v3clusterpb.Cluster{ | ||
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), | ||
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone), | ||
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), | ||
}, | ||
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})}, | ||
SkipValidation: true, | ||
} | ||
|
||
// Set a load balancing weight of 0 for the backend in the EDS resource. | ||
// This is expected to be NACKed by the xDS client. Since the | ||
// cluster_resolver LB policy has no previously received good EDS resource, | ||
// it will treat this as though it received an update with no endpoints. | ||
resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0} | ||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
if err := mgmtServer.Update(ctx, resources); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
// Create xDS client, configure cds_experimental LB policy with a manual | ||
// resolver, and dial the test backends. | ||
cc, cleanup := setupAndDial(t, bootstrapContents) | ||
defer cleanup() | ||
|
||
// Ensure that the DNS resolver is started for the expected target. | ||
select { | ||
case <-ctx.Done(): | ||
t.Fatal("Timeout when waiting for DNS resolver to be started") | ||
case target := <-dnsTargetCh: | ||
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) | ||
if got != want { | ||
t.Fatalf("DNS resolution started for target %q, want %q", got, want) | ||
} | ||
} | ||
|
||
// Update DNS resolver with test backend addresses. | ||
dnsR.UpdateState(resolver.State{Addresses: addrs[1:]}) | ||
|
||
// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster. | ||
peer := &peer.Peer{} | ||
client := testgrpc.NewTestServiceClient(cc) | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { | ||
t.Fatalf("EmptyCall() failed: %v", err) | ||
} | ||
if peer.Addr.String() != addrs[1].Addr { | ||
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[1].Addr) | ||
} | ||
} | ||
|
||
// TestAggregateCluster_Fallback_EDS_ResourceNotFound tests the scenario where | ||
// the top-level cluster is an aggregate cluster that resolves to an EDS and | ||
// LOGICAL_DNS cluster. The management server does not respond with the EDS | ||
// cluster. The test verifies that the cluster_resolver LB policy falls back to | ||
// the LOGICAL_DNS cluster in this case. | ||
func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) { | ||
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() | ||
defer cleanup1() | ||
|
||
// Start an xDS management server. | ||
mgmtServer, nodeID, _, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) | ||
defer cleanup2() | ||
|
||
// Start a test backend for the LOGICAL_DNS cluster. | ||
server := stubserver.StartTestService(t, nil) | ||
defer server.Stop() | ||
|
||
// Configure an aggregate cluster pointing to an EDS and DNS cluster. No | ||
// endpoints are configured for the EDS cluster. | ||
const ( | ||
edsClusterName = clusterName + "-eds" | ||
dnsClusterName = clusterName + "-dns" | ||
dnsHostName = "dns_host" | ||
dnsPort = uint32(8080) | ||
) | ||
resources := e2e.UpdateOptions{ | ||
NodeID: nodeID, | ||
Clusters: []*v3clusterpb.Cluster{ | ||
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), | ||
e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone), | ||
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), | ||
}, | ||
SkipValidation: true, | ||
} | ||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
if err := mgmtServer.Update(ctx, resources); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
// Create an xDS client talking to the above management server, configured | ||
// with a short watch expiry timeout. | ||
xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ | ||
XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), | ||
NodeProto: &v3corepb.Node{Id: nodeID}, | ||
}, defaultTestWatchExpiryTimeout, time.Duration(0)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Optional, mainly musing. The test above polls for a second, and this requires 500 milliseconds to invoke the resource not found error. Is there a way to write this that doesn't introduce testing time as such? I don't see a way around it but curious your thoughts. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I don't understand what you are referring to here.
For RDS/EDS, the only way to force a resource-not-found error is by the watch timer firing. And since this is an e2e test, there is no way for us to inject such an event other than actually waiting for it to happen. And the only thing we can do here is to pass a really low value for the watch timeout. Also, the code towards the end of this test, where it verifies fallback does not actually depend on any timings. It simply verifies that an RPC gets routed to a backend in the DNS cluster before the test context expires.
I don't see a way either :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wrt timer...could do something like: https://github.com/grpc/grpc-go/blob/master/xds/internal/balancer/outlierdetection/balancer.go#L50 for time.AfterFunc, or https://github.com/grpc/grpc-go/pull/4270/files#diff-cfb871b425a217deb8602c43a41b39ed1378776d5a40bebee381fec0f231a011R51 for a newTimer. But yeah, prob not worth it in this scenario. I also was thinking setting the 500 ms to something like 10ms since I don't think it matters the length. |
||
if err != nil { | ||
t.Fatalf("failed to create xds client: %v", err) | ||
} | ||
defer close() | ||
|
||
// Create a manual resolver and push a service config specifying the use of | ||
// the cds LB policy as the top-level LB policy, and a corresponding config | ||
// with a single cluster. | ||
r := manual.NewBuilderWithScheme("whatever") | ||
jsonSC := fmt.Sprintf(`{ | ||
"loadBalancingConfig":[{ | ||
"cds_experimental":{ | ||
"cluster": "%s" | ||
} | ||
}] | ||
}`, clusterName) | ||
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) | ||
r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient)) | ||
|
||
// Create a ClientConn. | ||
cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) | ||
if err != nil { | ||
t.Fatalf("failed to dial local test server: %v", err) | ||
} | ||
defer cc.Close() | ||
|
||
// Make an RPC with a short deadline. We expect this RPC to not succeed | ||
// because the DNS resolver has not responded with endpoint addresses. | ||
client := testgrpc.NewTestServiceClient(cc) | ||
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) | ||
defer sCancel() | ||
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { | ||
t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded) | ||
} | ||
|
||
// Ensure that the DNS resolver is started for the expected target. | ||
select { | ||
case <-ctx.Done(): | ||
t.Fatal("Timeout when waiting for DNS resolver to be started") | ||
case target := <-dnsTargetCh: | ||
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) | ||
if got != want { | ||
t.Fatalf("DNS resolution started for target %q, want %q", got, want) | ||
} | ||
} | ||
|
||
// Update DNS resolver with test backend addresses. | ||
dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: server.Address}}}) | ||
|
||
// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster. | ||
// Even though the EDS cluster is of higher priority, since the management | ||
// server does not respond with an EDS resource, the cluster_resolver LB | ||
// policy is expected to fallback to the LOGICAL_DNS cluster once the watch | ||
// timeout expires. | ||
peer := &peer.Peer{} | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { | ||
t.Fatalf("EmptyCall() failed: %v", err) | ||
} | ||
if peer.Addr.String() != server.Address { | ||
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.