Skip to content

xds/internal/resolver: switch to generic xDS API for LDS/RDS #6729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion test/xds/xds_client_federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s) TestFederation_UnknownAuthorityInDialTarget(t *testing.T) {

target = fmt.Sprintf("xds://unknown-authority/%s", serviceName)
t.Logf("Dialing target %q with unknown authority which is expected to fail", target)
const wantErr = `authority "unknown-authority" is not found in the bootstrap file`
wantErr := fmt.Sprintf("authority \"unknown-authority\" specified in dial target %q is not found in the bootstrap file", target)
_, err = grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
if err == nil || !strings.Contains(err.Error(), wantErr) {
t.Fatalf("grpc.Dial(%q) returned %v, want: %s", target, err, wantErr)
Expand Down
111 changes: 6 additions & 105 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/httpfilter"
rinternal "google.golang.org/grpc/xds/internal/resolver/internal"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

Expand Down Expand Up @@ -73,16 +72,6 @@ type xdsClusterManagerConfig struct {
Children map[string]xdsChildConfig `json:"children"`
}

// pruneActiveClusters deletes entries in r.activeClusters with zero
// references.
func (r *xdsResolver) pruneActiveClusters() {
for cluster, ci := range r.activeClusters {
if atomic.LoadInt32(&ci.refCount) == 0 {
delete(r.activeClusters, cluster)
}
}
}

// serviceConfigJSON produces a service config in JSON format representing all
// the clusters referenced in activeClusters. This includes clusters with zero
// references, so they must be pruned first.
Expand Down Expand Up @@ -197,10 +186,9 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
if v := atomic.AddInt32(ref, -1); v == 0 {
// This entry will be removed from activeClusters when
// producing the service config for the empty update.
select {
case cs.r.updateCh <- suWithError{emptyUpdate: true}:
default:
}
cs.r.serializer.Schedule(func(context.Context) {
cs.r.onClusterRefDownToZero()
})
}
},
Interceptor: interceptor,
Expand Down Expand Up @@ -342,97 +330,10 @@ func (cs *configSelector) stop() {
// selector; we need another update to delete clusters from the config (if
// we don't have another update pending already).
if needUpdate {
select {
case cs.r.updateCh <- suWithError{emptyUpdate: true}:
default:
}
}
}

// newConfigSelector creates the config selector for su; may add entries to
// r.activeClusters for previously-unseen clusters.
func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) {
cs := &configSelector{
r: r,
virtualHost: virtualHost{
httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride,
retryConfig: su.virtualHost.RetryConfig,
},
routes: make([]route, len(su.virtualHost.Routes)),
clusters: make(map[string]*clusterInfo),
httpFilterConfig: su.ldsConfig.httpFilterConfig,
cs.r.serializer.Schedule(func(context.Context) {
cs.r.onClusterRefDownToZero()
})
}

for i, rt := range su.virtualHost.Routes {
clusters := rinternal.NewWRR.(func() wrr.WRR)()
if rt.ClusterSpecifierPlugin != "" {
clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin
clusters.Add(&routeCluster{
name: clusterName,
}, 1)
cs.initializeCluster(clusterName, xdsChildConfig{
ChildPolicy: balancerConfig(su.clusterSpecifierPlugins[rt.ClusterSpecifierPlugin]),
})
} else {
for cluster, wc := range rt.WeightedClusters {
clusterName := clusterPrefix + cluster
clusters.Add(&routeCluster{
name: clusterName,
httpFilterConfigOverride: wc.HTTPFilterConfigOverride,
}, int64(wc.Weight))
cs.initializeCluster(clusterName, xdsChildConfig{
ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}),
})
}
}
cs.routes[i].clusters = clusters

var err error
cs.routes[i].m, err = xdsresource.RouteToMatcher(rt)
if err != nil {
return nil, err
}
cs.routes[i].actionType = rt.ActionType
if rt.MaxStreamDuration == nil {
cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration
} else {
cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
}

cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride
cs.routes[i].retryConfig = rt.RetryConfig
cs.routes[i].hashPolicies = rt.HashPolicies
}

// Account for this config selector's clusters. Do this after no further
// errors may occur. Note: cs.clusters are pointers to entries in
// activeClusters.
for _, ci := range cs.clusters {
atomic.AddInt32(&ci.refCount, 1)
}

return cs, nil
}

// initializeCluster initializes entries in cs.clusters map, creating entries in
// r.activeClusters as necessary. Any created entries will have a ref count set
// to zero as their ref count will be incremented by incRefs.
func (cs *configSelector) initializeCluster(clusterName string, cfg xdsChildConfig) {
ci := cs.r.activeClusters[clusterName]
if ci == nil {
ci = &clusterInfo{refCount: 0}
cs.r.activeClusters[clusterName] = ci
}
cs.clusters[clusterName] = ci
cs.clusters[clusterName].cfg = cfg
}

type clusterInfo struct {
// number of references to this cluster; accessed atomically
refCount int32
// cfg is the child configuration for this cluster, containing either the
// csp config or the cds cluster config.
cfg xdsChildConfig
}

type interceptorList struct {
Expand Down
210 changes: 51 additions & 159 deletions xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,185 +19,77 @@
package resolver

import (
"fmt"
"sync"
"time"
"context"

"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/xds/internal/clusterspecifier"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

// serviceUpdate contains information received from the LDS/RDS responses which
// are of interest to the xds resolver. The RDS request is built by first
// making a LDS to get the RouteConfig name.
type serviceUpdate struct {
// virtualHost contains routes and other configuration to route RPCs.
virtualHost *xdsresource.VirtualHost
// clusterSpecifierPlugins contains the configurations for any cluster
// specifier plugins emitted by the xdsclient.
clusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig
// ldsConfig contains configuration that applies to all routes.
ldsConfig ldsConfig
type listenerWatcher struct {
resourceName string
cancel func()
parent *xdsResolver
}

// ldsConfig contains information received from the LDS responses which are of
// interest to the xds resolver.
type ldsConfig struct {
// maxStreamDuration is from the HTTP connection manager's
// common_http_protocol_options field.
maxStreamDuration time.Duration
httpFilterConfig []xdsresource.HTTPFilter
func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatcher {
lw := &listenerWatcher{resourceName: resourceName, parent: parent}
lw.cancel = xdsresource.WatchListener(parent.xdsClient, resourceName, lw)
return lw
}

// watchService uses LDS and RDS to discover information about the provided
// serviceName.
//
// Note that during race (e.g. an xDS response is received while the user is
// calling cancel()), there's a small window where the callback can be called
// after the watcher is canceled. The caller needs to handle this case.
//
// TODO(easwars): Make this function a method on the xdsResolver type.
// Currently, there is a single call site for this function, and all arguments
// passed to it are fields of the xdsResolver type.
func watchService(c xdsclient.XDSClient, serviceName string, cb func(serviceUpdate, error), logger *grpclog.PrefixLogger) (cancel func()) {
w := &serviceUpdateWatcher{
logger: logger,
c: c,
serviceName: serviceName,
serviceCb: cb,
}
w.ldsCancel = c.WatchListener(serviceName, w.handleLDSResp)

return w.close
func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
l.parent.serializer.Schedule(func(context.Context) {
l.parent.onListenerResourceUpdate(update.Resource)
})
}

// serviceUpdateWatcher handles LDS and RDS response, and calls the service
// callback at the right time.
type serviceUpdateWatcher struct {
logger *grpclog.PrefixLogger
c xdsclient.XDSClient
serviceName string
ldsCancel func()
serviceCb func(serviceUpdate, error)
lastUpdate serviceUpdate

mu sync.Mutex
closed bool
rdsName string
rdsCancel func()
func (l *listenerWatcher) OnError(err error) {
l.parent.serializer.Schedule(func(context.Context) {
l.parent.onListenerResourceError(err)
})
}

func (w *serviceUpdateWatcher) handleLDSResp(update xdsresource.ListenerUpdate, err error) {
w.logger.Infof("received LDS update: %+v, err: %v", pretty.ToJSON(update), err)
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return
}
if err != nil {
// We check the error type and do different things. For now, the only
// type we check is ResourceNotFound, which indicates the LDS resource
// was removed, and besides sending the error to callback, we also
// cancel the RDS watch.
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound && w.rdsCancel != nil {
w.rdsCancel()
w.rdsName = ""
w.rdsCancel = nil
w.lastUpdate = serviceUpdate{}
}
// The other error cases still return early without canceling the
// existing RDS watch.
w.serviceCb(serviceUpdate{}, err)
return
}

w.lastUpdate.ldsConfig = ldsConfig{
maxStreamDuration: update.MaxStreamDuration,
httpFilterConfig: update.HTTPFilters,
}

if update.InlineRouteConfig != nil {
// If there was an RDS watch, cancel it.
w.rdsName = ""
if w.rdsCancel != nil {
w.rdsCancel()
w.rdsCancel = nil
}
func (l *listenerWatcher) OnResourceDoesNotExist() {
l.parent.serializer.Schedule(func(context.Context) {
l.parent.onListenerResourceNotFound()
})
}

// Handle the inline RDS update as if it's from an RDS watch.
w.applyRouteConfigUpdate(*update.InlineRouteConfig)
return
}
func (l *listenerWatcher) stop() {
l.cancel()
l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName)
}

// RDS name from update is not an empty string, need RDS to fetch the
// routes.
type routeConfigWatcher struct {
resourceName string
cancel func()
parent *xdsResolver
}

if w.rdsName == update.RouteConfigName {
// If the new RouteConfigName is same as the previous, don't cancel and
// restart the RDS watch.
//
// If the route name did change, then we must wait until the first RDS
// update before reporting this LDS config.
if w.lastUpdate.virtualHost != nil {
// We want to send an update with the new fields from the new LDS
// (e.g. max stream duration), and old fields from the previous
// RDS.
//
// But note that this should only happen when virtual host is set,
// which means an RDS was received.
w.serviceCb(w.lastUpdate, nil)
}
return
}
w.rdsName = update.RouteConfigName
if w.rdsCancel != nil {
w.rdsCancel()
}
w.rdsCancel = w.c.WatchRouteConfig(update.RouteConfigName, w.handleRDSResp)
func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfigWatcher {
rw := &routeConfigWatcher{resourceName: resourceName, parent: parent}
rw.cancel = xdsresource.WatchRouteConfig(parent.xdsClient, resourceName, rw)
return rw
}

func (w *serviceUpdateWatcher) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) {
matchVh := xdsresource.FindBestMatchingVirtualHost(w.serviceName, update.VirtualHosts)
if matchVh == nil {
// No matching virtual host found.
w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName))
return
}
func (r *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
r.parent.serializer.Schedule(func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource)
})
}

w.lastUpdate.virtualHost = matchVh
w.lastUpdate.clusterSpecifierPlugins = update.ClusterSpecifierPlugins
w.serviceCb(w.lastUpdate, nil)
func (r *routeConfigWatcher) OnError(err error) {
r.parent.serializer.Schedule(func(context.Context) {
r.parent.onRouteConfigResourceError(r.resourceName, err)
})
}

func (w *serviceUpdateWatcher) handleRDSResp(update xdsresource.RouteConfigUpdate, err error) {
w.logger.Infof("received RDS update: %+v, err: %v", pretty.ToJSON(update), err)
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return
}
if w.rdsCancel == nil {
// This mean only the RDS watch is canceled, can happen if the LDS
// resource is removed.
return
}
if err != nil {
w.serviceCb(serviceUpdate{}, err)
return
}
w.applyRouteConfigUpdate(update)
func (r *routeConfigWatcher) OnResourceDoesNotExist() {
r.parent.serializer.Schedule(func(context.Context) {
r.parent.onRouteConfigResourceNotFound(r.resourceName)
})
}

func (w *serviceUpdateWatcher) close() {
w.mu.Lock()
defer w.mu.Unlock()
w.closed = true
w.ldsCancel()
if w.rdsCancel != nil {
w.rdsCancel()
w.rdsCancel = nil
}
func (r *routeConfigWatcher) stop() {
r.cancel()
r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName)
}
Loading