Skip to content

xds/resolver: move service watching tests to resolver_test package #6682

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 46 additions & 184 deletions xds/internal/resolver/cluster_specifier_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,18 @@ import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpctest"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/testutils"
xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/clusterspecifier"
xdsresolver "google.golang.org/grpc/xds/internal/resolver"
protov2 "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand All @@ -49,80 +43,6 @@ import (
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
)

const (
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 100 * time.Microsecond
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// verifyUpdateFromResolver waits for the resolver to push an update to the fake
// resolver.ClientConn and verifies that update matches the provided service
// config.
//
// Tests that want to skip verifying the contents of the service config can pass
// an empty string.
//
// Returns the config selector from the state update pushed by the resolver.
// Tests that don't need the config selector can ignore the return value.
func verifyUpdateFromResolver(ctx context.Context, t *testing.T, stateCh chan resolver.State, wantSC string) iresolver.ConfigSelector {
t.Helper()

var state resolver.State
select {
case <-ctx.Done():
t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err())
case state = <-stateCh:
if err := state.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err)
}
if wantSC == "" {
break
}
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantSC)
if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) {
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}
}
cs := iresolver.GetConfigSelector(state)
if cs == nil {
t.Fatal("Received nil config selector in update from resolver")
}
return cs
}

// buildResolverForTarget builds an xDS resolver for the given target. It
// returns the following:
// - a channel to read updates from the resolver
// - the newly created xDS resolver
func buildResolverForTarget(t *testing.T, target resolver.Target) (chan resolver.State, resolver.Resolver) {
t.Helper()

builder := resolver.Get(xdsresolver.Scheme)
if builder == nil {
t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme)
}

stateCh := make(chan resolver.State, 1)
updateStateF := func(s resolver.State) error {
stateCh <- s
return nil
}
tcc := &testutils.ResolverClientConn{Logger: t, UpdateStateF: updateStateF}
r, err := builder.Build(target, tcc, resolver.BuildOptions{})
if err != nil {
t.Fatalf("Failed to build xDS resolver for target %q: %v", target, err)
}
t.Cleanup(r.Close)
return stateCh, r
}

func init() {
balancer.Register(cspBalancerBuilder{})
clusterspecifier.Register(testClusterSpecifierPlugin{})
Expand Down Expand Up @@ -201,46 +121,24 @@ func (s) TestResolverClusterSpecifierPlugin(t *testing.T) {
envconfig.XDSRLS = oldRLS
}()

mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatalf("Failed to start xDS management server: %v", err)
}
defer mgmtServer.Stop()

// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()

// Configure listener and route configuration resources on the management
// server.
const serviceName = "my-service-client-side-xds"
rdsName := "route-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: rdsName,
ListenerName: serviceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}),
})},
SkipValidation: true,
}
// Spin up an xDS management server for the test.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
nodeID := uuid.New().String()
mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)

// Configure resources on the management server.
listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)

stateCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})

// Wait for an update from the resolver, and verify the service config.
wantSC := `
Expand Down Expand Up @@ -276,21 +174,14 @@ func (s) TestResolverClusterSpecifierPlugin(t *testing.T) {
}

// Change the cluster specifier plugin configuration.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: rdsName,
ListenerName: serviceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "changed"}),
})},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "changed"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)

// Wait for an update from the resolver, and verify the service config.
wantSC = `
Expand Down Expand Up @@ -328,46 +219,24 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
envconfig.XDSRLS = oldRLS
}()

mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatalf("Failed to start xDS management server: %v", err)
}
defer mgmtServer.Stop()

// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()

// Configure listener and route configuration resources on the management
// server.
const serviceName = "my-service-client-side-xds"
rdsName := "route-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: rdsName,
ListenerName: serviceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingA"}),
})},
SkipValidation: true,
}
// Spin up an xDS management server for the test.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
nodeID := uuid.New().String()
mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)

// Configure resources on the management server.
listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingA"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)

stateCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})

// Wait for an update from the resolver, and verify the service config.
wantSC := `
Expand Down Expand Up @@ -407,21 +276,14 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
// clusters, they still appear in the service config.

// Change the cluster specifier plugin configuration.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: rdsName,
ListenerName: serviceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspB",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingB"}),
})},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspB",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingB"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)

// Wait for an update from the resolver, and verify the service config.
wantSC = `
Expand Down
Loading