Skip to content

Commit

Permalink
xdsclient: update watcher API as per gRFC A88
Browse files Browse the repository at this point in the history
  • Loading branch information
purnesh42H committed Jan 6, 2025
1 parent 724f450 commit 43c9adb
Show file tree
Hide file tree
Showing 23 changed files with 384 additions and 365 deletions.
35 changes: 10 additions & 25 deletions xds/csds/csds_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,49 +70,37 @@ func Test(t *testing.T) {

type nopListenerWatcher struct{}

func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopRouteConfigWatcher struct{}

func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) OnResourceChanged(_ *xdsresource.RouteConfigResourceData, _ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopRouteConfigWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopClusterWatcher struct{}

func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopClusterWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) OnResourceChanged(_ *xdsresource.ClusterResourceData, _ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopClusterWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

type nopEndpointsWatcher struct{}

func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) OnResourceChanged(_ *xdsresource.EndpointsResourceData, _ error, onDone xdsresource.OnDoneFunc) {
onDone()
}
func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (nopEndpointsWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
onDone()
}

Expand All @@ -137,13 +125,10 @@ func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWa
}
}

func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) OnResourceChanged(_ *xdsresource.ListenerResourceData, _ error, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}
func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
func (w *blockingListenerWatcher) OnAmbientError(_ error, onDone xdsresource.OnDoneFunc) {
writeOnDone(w.testCtxDone, w.onDoneCh, onDone)
}

Expand Down
13 changes: 6 additions & 7 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (b *cdsBalancer) ResolverError(err error) {
if b.lbCfg != nil {
root = b.lbCfg.ClusterName
}
b.onClusterError(root, err)
b.onClusterAmbientError(root, err)
})
}

Expand Down Expand Up @@ -428,20 +428,20 @@ func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpd
// If the security config is invalid, for example, if the provider
// instance is not found in the bootstrap config, we need to put the
// channel in transient failure.
b.onClusterError(name, fmt.Errorf("received Cluster resource contains invalid security config: %v", err))
b.onClusterAmbientError(name, fmt.Errorf("received Cluster resource contains invalid security config: %v", err))
return
}
}

clustersSeen := make(map[string]bool)
dms, ok, err := b.generateDMsForCluster(b.lbCfg.ClusterName, 0, nil, clustersSeen)
if err != nil {
b.onClusterError(b.lbCfg.ClusterName, fmt.Errorf("failed to generate discovery mechanisms: %v", err))
b.onClusterAmbientError(b.lbCfg.ClusterName, fmt.Errorf("failed to generate discovery mechanisms: %v", err))
return
}
if ok {
if len(dms) == 0 {
b.onClusterError(b.lbCfg.ClusterName, fmt.Errorf("aggregate cluster graph has no leaf clusters"))
b.onClusterAmbientError(b.lbCfg.ClusterName, fmt.Errorf("aggregate cluster graph has no leaf clusters"))
return
}
// Child policy is built the first time we resolve the cluster graph.
Expand Down Expand Up @@ -501,7 +501,7 @@ func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpd
// TRANSIENT_FAILURE.
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) onClusterError(name string, err error) {
func (b *cdsBalancer) onClusterAmbientError(name string, err error) {
b.logger.Warningf("Cluster resource %q received error update: %v", name, err)

if b.childLB != nil {
Expand All @@ -525,8 +525,7 @@ func (b *cdsBalancer) onClusterError(name string, err error) {
// TRANSIENT_FAILURE.
//
// Only executed in the context of a serializer callback.
func (b *cdsBalancer) onClusterResourceNotFound(name string) {
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Cluster not found in received response", name)
func (b *cdsBalancer) onClusterResourceChangedError(name string, err error) {

Check failure on line 528 in xds/internal/balancer/cdsbalancer/cdsbalancer.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

parameter 'name' seems to be unused, consider removing or renaming it as _ https://revive.run/r#unused-parameter
if b.childLB != nil {
b.childLB.ResolverError(err)
} else {
Expand Down
16 changes: 8 additions & 8 deletions xds/internal/balancer/cdsbalancer/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ type clusterWatcher struct {
parent *cdsBalancer
}

func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData, onDone xdsresource.OnDoneFunc) {
func (cw *clusterWatcher) OnResourceChanged(u *xdsresource.ClusterResourceData, err error, onDone xdsresource.OnDoneFunc) {
if err != nil {
handleError := func(context.Context) { cw.parent.onClusterResourceChangedError(cw.name, err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)
return
}
handleUpdate := func(context.Context) { cw.parent.onClusterUpdate(cw.name, u.Resource); onDone() }
cw.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (cw *clusterWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { cw.parent.onClusterError(cw.name, err); onDone() }
func (cw *clusterWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { cw.parent.onClusterAmbientError(cw.name, err); onDone() }
cw.parent.serializer.ScheduleOr(handleError, onDone)
}

func (cw *clusterWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { cw.parent.onClusterResourceNotFound(cw.name); onDone() }
cw.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

// watcherState groups the state associated with a clusterWatcher.
type watcherState struct {
watcher *clusterWatcher // The underlying watcher.
Expand Down
49 changes: 24 additions & 25 deletions xds/internal/balancer/clusterresolver/resource_resolver_eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,42 @@ func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelR
}

// OnUpdate is invoked to report an update for the resource being watched.
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData, onDone xdsresource.OnDoneFunc) {
func (er *edsDiscoveryMechanism) OnResourceChanged(update *xdsresource.EndpointsResourceData, err error, onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone()
return
}

if err != nil {
if er.logger.V(2) {
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
} else {
er.logger.Infof("EDS discovery mechanism for resource %q reported on resource changed error: %v", er.nameToWatch, err)
}
}
// Report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.mu.Lock()
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
return
}

er.mu.Lock()
er.update = &update.Resource
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFunc) {
func (er *edsDiscoveryMechanism) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone()
return
Expand Down Expand Up @@ -119,26 +141,3 @@ func (er *edsDiscoveryMechanism) OnError(err error, onDone xdsresource.OnDoneFun

er.topLevelResolver.onUpdate(onDone)
}

func (er *edsDiscoveryMechanism) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
if er.stopped.HasFired() {
onDone()
return
}

if er.logger.V(2) {
er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
}

// Report an empty update that would result in no priority child being
// created for this discovery mechanism. This would result in the priority
// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
// localities) if this was the only discovery mechanism, or would result in
// the priority LB policy using a lower priority discovery mechanism when
// that becomes available.
er.mu.Lock()
er.update = &xdsresource.EndpointsUpdate{}
er.mu.Unlock()

er.topLevelResolver.onUpdate(onDone)
}
32 changes: 16 additions & 16 deletions xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,21 @@ func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatch
return lw
}

func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
func (l *listenerWatcher) OnResourceChanged(update *xdsresource.ListenerResourceData, err error, onDone xdsresource.OnDoneFunc) {
if err != nil {
handleError := func(context.Context) { l.parent.onListenerResourceChangedError(err); onDone() }
l.parent.serializer.ScheduleOr(handleError, onDone)
return
}
handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update.Resource); onDone() }
l.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (l *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() }
func (l *listenerWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { l.parent.onListenerResourceAmbientError(err); onDone() }
l.parent.serializer.ScheduleOr(handleError, onDone)
}

func (l *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { l.parent.onListenerResourceNotFound(); onDone() }
l.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

func (l *listenerWatcher) stop() {
l.cancel()
l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName)
Expand All @@ -68,24 +68,24 @@ func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfi
return rw
}

func (r *routeConfigWatcher) OnUpdate(u *xdsresource.RouteConfigResourceData, onDone xdsresource.OnDoneFunc) {
func (r *routeConfigWatcher) OnResourceChanged(u *xdsresource.RouteConfigResourceData, err error, onDone xdsresource.OnDoneFunc) {
if err != nil {
handleError := func(context.Context) { r.parent.onRouteConfigResourceChangedError(r.resourceName, err); onDone() }
r.parent.serializer.ScheduleOr(handleError, onDone)
return
}
handleUpdate := func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, u.Resource)
onDone()
}
r.parent.serializer.ScheduleOr(handleUpdate, onDone)
}

func (r *routeConfigWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() }
func (r *routeConfigWatcher) OnAmbientError(err error, onDone xdsresource.OnDoneFunc) {
handleError := func(context.Context) { r.parent.onRouteConfigResourceAmbientError(r.resourceName, err); onDone() }
r.parent.serializer.ScheduleOr(handleError, onDone)
}

func (r *routeConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
handleNotFound := func(context.Context) { r.parent.onRouteConfigResourceNotFound(r.resourceName); onDone() }
r.parent.serializer.ScheduleOr(handleNotFound, onDone)
}

func (r *routeConfigWatcher) stop() {
r.cancel()
r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName)
Expand Down
20 changes: 14 additions & 6 deletions xds/internal/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,17 +518,21 @@ func (r *xdsResolver) onListenerResourceUpdate(update xdsresource.ListenerUpdate
r.routeConfigWatcher = newRouteConfigWatcher(r.rdsResourceName, r)
}

func (r *xdsResolver) onListenerResourceError(err error) {
func (r *xdsResolver) onListenerResourceAmbientError(err error) {
if r.logger.V(2) {
r.logger.Infof("Received error for Listener resource %q: %v", r.ldsResourceName, err)
}
r.onError(err)
}

// Only executed in the context of a serializer callback.
func (r *xdsResolver) onListenerResourceNotFound() {
func (r *xdsResolver) onListenerResourceChangedError(err error) {
if r.logger.V(2) {
r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName)
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName)
} else {
r.logger.Infof("Received on-resource-changed error for Listener resource %q: %v", r.ldsResourceName, err)
}
}

r.listenerUpdateRecvd = false
Expand Down Expand Up @@ -559,17 +563,21 @@ func (r *xdsResolver) onRouteConfigResourceUpdate(name string, update xdsresourc
}

// Only executed in the context of a serializer callback.
func (r *xdsResolver) onRouteConfigResourceError(name string, err error) {
func (r *xdsResolver) onRouteConfigResourceAmbientError(name string, err error) {
if r.logger.V(2) {
r.logger.Infof("Received error for RouteConfiguration resource %q: %v", name, err)
}
r.onError(err)
}

// Only executed in the context of a serializer callback.
func (r *xdsResolver) onRouteConfigResourceNotFound(name string) {
func (r *xdsResolver) onRouteConfigResourceChangedError(name string, err error) {
if r.logger.V(2) {
r.logger.Infof("Received resource-not-found-error for RouteConfiguration resource %q", name)
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
r.logger.Infof("Received resource-not-found-error for RouteConfiguration resource %q", name)
} else {
r.logger.Infof("Received on-resource-changed error for RouteConfiguration resource %q: %v", name, err)
}
}

if r.rdsResourceName != name {
Expand Down
Loading

0 comments on commit 43c9adb

Please sign in to comment.