diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index dedd9218a181..6a60bc308a96 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -280,7 +280,7 @@ func (b *clusterResolverBalancer) handleErrorFromUpdate(err error, fromParent bo // EDS resource was removed. No action needs to be taken for this, and we // should continue watching the same EDS resource. if fromParent && xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { - b.resourceWatcher.stop() + b.resourceWatcher.stop(false) } if b.child != nil { @@ -326,7 +326,7 @@ func (b *clusterResolverBalancer) run() { // Close results in stopping the endpoint resolvers and closing the // underlying child policy and is the only way to exit this goroutine. case <-b.closed.Done(): - b.resourceWatcher.stop() + b.resourceWatcher.stop(true) if b.child != nil { b.child.Close() diff --git a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go index d544360bba63..f466dcca7bda 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go @@ -18,9 +18,10 @@ package e2e_test import ( "context" - "errors" "fmt" + "net" "sort" + "strconv" "strings" "testing" "time" @@ -28,6 +29,7 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/stubserver" @@ -317,34 +319,41 @@ func (s) TestAggregateCluster_WithTwoEDSClusters_PrioritiesChange(t *testing.T) } } +func hostAndPortFromAddress(t *testing.T, addr string) (string, uint32) { + t.Helper() + + host, p, err := net.SplitHostPort(addr) + if err != nil { + t.Fatalf("Invalid serving address: %v", addr) + } + port, err := strconv.ParseUint(p, 10, 32) + if err != nil { + t.Fatalf("Invalid serving port %q: %v", p, err) + } + return host, uint32(port) +} + // TestAggregateCluster_WithOneDNSCluster tests the case where the top-level // cluster resource is an aggregate cluster that resolves to a single // LOGICAL_DNS cluster. The test verifies that RPCs can be made to backends that // make up the LOGICAL_DNS cluster. func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() - // Start two test backends. - servers, cleanup3 := startTestServiceBackends(t, 2) - defer cleanup3() - addrs, _ := backendAddressesAndPorts(t, servers) + // Start a test service backend. + server := stubserver.StartTestService(t, nil) + defer server.Stop() + host, port := hostAndPortFromAddress(t, server.Address) // Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster. - const ( - dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) - ) + const dnsClusterName = clusterName + "-dns" resources := e2e.UpdateOptions{ NodeID: nodeID, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterName, []string{dnsClusterName}), - makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + makeLogicalDNSClusterResource(dnsClusterName, host, uint32(port)), }, SkipValidation: true, } @@ -359,19 +368,93 @@ func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) { cc, cleanup := setupAndDial(t, bootstrapContents) defer cleanup() - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) + // Make an RPC and ensure that it gets routed to the first backend since the + // child policy for a LOGICAL_DNS cluster is pick_first by default. + client := testgrpc.NewTestServiceClient(cc) + peer := &peer.Peer{} + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + if peer.Addr.String() != server.Address { + t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address) + } +} + +// Tests the case where the top-level cluster resource is an aggregate cluster +// that resolves to a single LOGICAL_DNS cluster. The specified dns hostname is +// expected to fail url parsing. The test verifies that the channel moves to +// TRANSIENT_FAILURE. +func (s) TestAggregateCluster_WithOneDNSCluster_ParseFailure(t *testing.T) { + // Start an xDS management server. + managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer cleanup2() + + // Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster. + const dnsClusterName = clusterName + "-dns" + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{dnsClusterName}), + makeLogicalDNSClusterResource(dnsClusterName, "%gh&%ij", uint32(8080)), + }, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create xDS client, configure cds_experimental LB policy with a manual + // resolver, and dial the test backends. + cc, cleanup := setupAndDial(t, bootstrapContents) + defer cleanup() + + // Ensure that the ClientConn moves to TransientFailure. + for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatalf("Timed out waiting for state change. got %v; want %v", state, connectivity.TransientFailure) } } +} - // Update DNS resolver with test backend addresses. - dnsR.UpdateState(resolver.State{Addresses: addrs}) +// Tests the case where the top-level cluster resource is an aggregate cluster +// that resolves to a single LOGICAL_DNS cluster. The test verifies that RPCs +// can be made to backends that make up the LOGICAL_DNS cluster. The hostname of +// the LOGICAL_DNS cluster is updated, and the test verifies that RPCs can be +// made to backends that the new hostname resolves to. +func (s) TestAggregateCluster_WithOneDNSCluster_HostnameChange(t *testing.T) { + // Start an xDS management server. + managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer cleanup1() + + // Start two test backends and extract their host and port. The first + // backend is used initially for the LOGICAL_DNS cluster and an update + // switches the cluster to use the second backend. + servers, cleanup2 := startTestServiceBackends(t, 2) + defer cleanup2() + + // Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster. + const dnsClusterName = clusterName + "-dns" + dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[0].Address) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{dnsClusterName}), + makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + }, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create xDS client, configure cds_experimental LB policy with a manual + // resolver, and dial the test backends. + cc, cleanup := setupAndDial(t, bootstrapContents) + defer cleanup() // Make an RPC and ensure that it gets routed to the first backend since the // child policy for a LOGICAL_DNS cluster is pick_first by default. @@ -380,8 +463,35 @@ func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) { if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { t.Fatalf("EmptyCall() failed: %v", err) } - if peer.Addr.String() != addrs[0].Addr { - t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr) + if peer.Addr.String() != servers[0].Address { + t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, servers[0].Address) + } + + // Update the LOGICAL_DNS cluster's hostname to point to the second backend. + dnsHostName, dnsPort = hostAndPortFromAddress(t, servers[1].Address) + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{dnsClusterName}), + makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + }, + SkipValidation: true, + } + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Ensure that traffic moves to the second backend eventually. + for ctx.Err() == nil { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + if peer.Addr.String() == servers[1].Address { + break + } + } + if ctx.Err() != nil { + t.Fatal("Timeout when waiting for RPCs to switch to the second backend") } } @@ -500,9 +610,6 @@ func (s) TestAggregateCluster_WithEDSAndDNS(t *testing.T) { // cluster. The test verifies that RPCs are successful, this time to backends in // the DNS cluster. func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() @@ -513,15 +620,12 @@ func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) { servers, cleanup3 := startTestServiceBackends(t, 2) defer cleanup3() addrs, ports := backendAddressesAndPorts(t, servers) + dnsHostName, dnsPort := hostAndPortFromAddress(t, addrs[1].Addr) // Configure an aggregate cluster pointing to a single EDS cluster. Also, // configure the underlying EDS cluster (and the corresponding endpoints // resource) and DNS cluster (will be used later in the test). - const ( - dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) - ) + const dnsClusterName = clusterName + "-dns" resources := e2e.UpdateOptions{ NodeID: nodeID, Clusters: []*v3clusterpb.Cluster{ @@ -563,20 +667,6 @@ func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) { t.Fatal(err) } - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } - } - - // Update DNS resolver with test backend addresses. - dnsR.UpdateState(resolver.State{Addresses: addrs[1:]}) - // Ensure that start getting routed to the backend corresponding to the // LOGICAL_DNS cluster. for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { @@ -705,17 +795,14 @@ func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) { // the DNS resolver pushes an update, the test verifies that we switch to the // DNS cluster and can make a successful RPC. func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() - // Start two test backends. - servers, cleanup3 := startTestServiceBackends(t, 2) - defer cleanup3() - addrs, _ := backendAddressesAndPorts(t, servers) + // Start a test service backend. + server := stubserver.StartTestService(t, nil) + defer server.Stop() + dnsHostName, dnsPort := hostAndPortFromAddress(t, server.Address) // Configure an aggregate cluster pointing to an EDS and LOGICAL_DNS // cluster. Also configure an empty endpoints resource for the EDS cluster @@ -723,8 +810,6 @@ func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) { const ( edsClusterName = clusterName + "-eds" dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) ) nackEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil) nackEndpointResource.Endpoints = []*v3endpointpb.LocalityLbEndpoints{ @@ -755,23 +840,9 @@ func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) { cc, cleanup := setupAndDial(t, bootstrapContents) defer cleanup() - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } - } - - // Update DNS resolver with test backend addresses. - dnsR.UpdateState(resolver.State{Addresses: addrs}) - // Ensure that RPCs start getting routed to the first backend since the // child policy for a LOGICAL_DNS cluster is pick_first by default. - pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]) + pickfirst.CheckRPCsToBackend(ctx, cc, resolver.Address{Addr: server.Address}) } // TestAggregateCluster_BadDNS_GoodEDS tests the case where the top-level @@ -780,34 +851,29 @@ func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) { // good update, this test verifies the cluster_resolver balancer correctly falls // back from the LOGICAL_DNS cluster to the EDS cluster. func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() - // Start two test backends. - servers, cleanup3 := startTestServiceBackends(t, 2) - defer cleanup3() - addrs, ports := backendAddressesAndPorts(t, servers) + // Start a test service backend. + server := stubserver.StartTestService(t, nil) + defer server.Stop() + _, edsPort := hostAndPortFromAddress(t, server.Address) // Configure an aggregate cluster pointing to an LOGICAL_DNS and EDS // cluster. Also configure an endpoints resource for the EDS cluster. const ( edsClusterName = clusterName + "-eds" dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) ) resources := e2e.UpdateOptions{ NodeID: nodeID, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterName, []string{dnsClusterName, edsClusterName}), - makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + makeLogicalDNSClusterResource(dnsClusterName, "bad.ip.v4.address", 8080), e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone), }, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(ports[0])})}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(edsPort)})}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -821,20 +887,6 @@ func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) { cc, cleanup := setupAndDial(t, bootstrapContents) defer cleanup() - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } - } - - // Push an error through the DNS resolver. - dnsR.ReportError(errors.New("some error")) - // RPCs should work, higher level DNS cluster errors so should fallback to // EDS cluster. client := testgrpc.NewTestServiceClient(cc) @@ -842,8 +894,8 @@ func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) { if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { t.Fatalf("EmptyCall() failed: %v", err) } - if peer.Addr.String() != addrs[0].Addr { - t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr) + if peer.Addr.String() != server.Address { + t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address) } } @@ -854,9 +906,6 @@ func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) { // error, the test verifies that RPCs fail with the error triggered by the DNS // Discovery Mechanism (from sending an empty address list down). func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() @@ -867,8 +916,6 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) { const ( edsClusterName = clusterName + "-eds" dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) ) emptyEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil) resources := e2e.UpdateOptions{ @@ -876,7 +923,7 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) { Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone), - makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + makeLogicalDNSClusterResource(dnsClusterName, "bad.ip.v4.address", 8080), }, Endpoints: []*v3endpointpb.ClusterLoadAssignment{emptyEndpointResource}, SkipValidation: true, @@ -892,39 +939,20 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) { cc, cleanup := setupAndDial(t, bootstrapContents) defer cleanup() - // Make an RPC with a short deadline. We expect this RPC to not succeed - // because the EDS resource came back with no endpoints, and we are yet to - // push an update through the DNS resolver. - client := testgrpc.NewTestServiceClient(cc) - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded) - } - - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } - } - - // Push an error from the DNS resolver as well. - dnsErr := fmt.Errorf("DNS error") - dnsR.ReportError(dnsErr) - // Ensure that the error from the DNS Resolver leads to an empty address // update for both priorities. - _, err := client.EmptyCall(ctx, &testpb.Empty{}) - if code := status.Code(err); code != codes.Unavailable { - t.Fatalf("EmptyCall() failed with code %s, want %s", code, codes.Unavailable) + client := testgrpc.NewTestServiceClient(cc) + for ctx.Err() == nil { + _, err := client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil { + t.Fatal("EmptyCall() succeeded when expected to fail") + } + if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "produced zero addresses") { + break + } } - if err == nil || !strings.Contains(err.Error(), "produced zero addresses") { - t.Fatalf("EmptyCall() failed with error: %v, want: produced zero addresses", err) + if ctx.Err() != nil { + t.Fatalf("Timeout when waiting for RPCs to fail with expected code and error") } } @@ -937,9 +965,6 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) { // previously received good update and that RPCs still get routed to the EDS // cluster. func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() @@ -950,14 +975,13 @@ func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *test servers, cleanup3 := startTestServiceBackends(t, 2) defer cleanup3() addrs, ports := backendAddressesAndPorts(t, servers) + dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[1].Address) // Configure an aggregate cluster pointing to an EDS and DNS cluster. Also // configure an endpoints resource for the EDS cluster. const ( edsClusterName = clusterName + "-eds" dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) ) resources := e2e.UpdateOptions{ NodeID: nodeID, @@ -980,20 +1004,6 @@ func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *test cc, cleanup := setupAndDial(t, bootstrapContents) defer cleanup() - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } - } - - // Update DNS resolver with test backend addresses. - dnsR.UpdateState(resolver.State{Addresses: addrs[1:]}) - // Make an RPC and ensure that it gets routed to the first backend since the // EDS cluster is of higher priority than the LOGICAL_DNS cluster. client := testgrpc.NewTestServiceClient(cc) @@ -1032,9 +1042,6 @@ func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *test // the LOGICAL_DNS cluster, because it is supposed to treat the bad EDS response // as though it received an update with no endpoints. func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() @@ -1045,13 +1052,12 @@ func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *tes servers, cleanup3 := startTestServiceBackends(t, 2) defer cleanup3() addrs, ports := backendAddressesAndPorts(t, servers) + dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[1].Address) // Configure an aggregate cluster pointing to an EDS and DNS cluster. const ( edsClusterName = clusterName + "-eds" dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) ) resources := e2e.UpdateOptions{ NodeID: nodeID, @@ -1080,20 +1086,6 @@ func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *tes cc, cleanup := setupAndDial(t, bootstrapContents) defer cleanup() - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } - } - - // Update DNS resolver with test backend addresses. - dnsR.UpdateState(resolver.State{Addresses: addrs[1:]}) - // Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster. peer := &peer.Peer{} client := testgrpc.NewTestServiceClient(cc) @@ -1111,9 +1103,6 @@ func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *tes // cluster. The test verifies that the cluster_resolver LB policy falls back to // the LOGICAL_DNS cluster in this case. func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. mgmtServer, nodeID, _, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() @@ -1121,14 +1110,13 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) { // Start a test backend for the LOGICAL_DNS cluster. server := stubserver.StartTestService(t, nil) defer server.Stop() + dnsHostName, dnsPort := hostAndPortFromAddress(t, server.Address) // Configure an aggregate cluster pointing to an EDS and DNS cluster. No // endpoints are configured for the EDS cluster. const ( edsClusterName = clusterName + "-eds" dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) ) resources := e2e.UpdateOptions{ NodeID: nodeID, @@ -1177,35 +1165,13 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) { } defer cc.Close() - // Make an RPC with a short deadline. We expect this RPC to not succeed - // because the DNS resolver has not responded with endpoint addresses. - client := testgrpc.NewTestServiceClient(cc) - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded) - } - - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } - } - - // Update DNS resolver with test backend addresses. - dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: server.Address}}}) - // Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster. // Even though the EDS cluster is of higher priority, since the management // server does not respond with an EDS resource, the cluster_resolver LB // policy is expected to fallback to the LOGICAL_DNS cluster once the watch // timeout expires. peer := &peer.Peer{} + client := testgrpc.NewTestServiceClient(cc) if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { t.Fatalf("EmptyCall() failed: %v", err) } diff --git a/xds/internal/balancer/clusterresolver/resource_resolver.go b/xds/internal/balancer/clusterresolver/resource_resolver.go index aaababa71c57..b9a81e9ba829 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver.go @@ -19,9 +19,11 @@ package clusterresolver import ( + "context" "sync" "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -83,9 +85,11 @@ type discoveryMechanismAndResolver struct { } type resourceResolver struct { - parent *clusterResolverBalancer - logger *grpclog.PrefixLogger - updateChannel chan *resourceUpdate + parent *clusterResolverBalancer + logger *grpclog.PrefixLogger + updateChannel chan *resourceUpdate + serializer *grpcsync.CallbackSerializer + serializerCancel context.CancelFunc // mu protects the slice and map, and content of the resolvers in the slice. mu sync.Mutex @@ -106,12 +110,16 @@ type resourceResolver struct { } func newResourceResolver(parent *clusterResolverBalancer, logger *grpclog.PrefixLogger) *resourceResolver { - return &resourceResolver{ + rr := &resourceResolver{ parent: parent, logger: logger, updateChannel: make(chan *resourceUpdate, 1), childrenMap: make(map[discoveryMechanismKey]discoveryMechanismAndResolver), } + ctx, cancel := context.WithCancel(context.Background()) + rr.serializer = grpcsync.NewCallbackSerializer(ctx) + rr.serializerCancel = cancel + return rr } func equalDiscoveryMechanisms(a, b []DiscoveryMechanism) bool { @@ -210,8 +218,9 @@ func (rr *resourceResolver) resolveNow() { } } -func (rr *resourceResolver) stop() { +func (rr *resourceResolver) stop(closing bool) { rr.mu.Lock() + // Save the previous childrenMap to stop the children outside the mutex, // and reinitialize the map. We only need to reinitialize to allow for the // policy to be reused if the resource comes back. In practice, this does @@ -222,12 +231,18 @@ func (rr *resourceResolver) stop() { rr.childrenMap = make(map[discoveryMechanismKey]discoveryMechanismAndResolver) rr.mechanisms = nil rr.children = nil + rr.mu.Unlock() for _, r := range cm { r.r.stop() } + if closing { + rr.serializerCancel() + <-rr.serializer.Done() + } + // stop() is called when the LB policy is closed or when the underlying // cluster resource is removed by the management server. In the latter case, // an empty config update needs to be pushed to the child policy to ensure @@ -272,7 +287,9 @@ func (rr *resourceResolver) generateLocked() { } func (rr *resourceResolver) onUpdate() { - rr.mu.Lock() - rr.generateLocked() - rr.mu.Unlock() + rr.serializer.Schedule(func(context.Context) { + rr.mu.Lock() + rr.generateLocked() + rr.mu.Unlock() + }) }