Skip to content

Commit 6fe6085

Browse files
authored
xds/internal/server: switch to generic xDS API for LDS/RDS (#6726)
1 parent df8fc99 commit 6fe6085

File tree

3 files changed

+99
-62
lines changed

3 files changed

+99
-62
lines changed

xds/internal/server/listener_wrapper.go

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,7 @@ type DrainCallback func(addr net.Addr)
6969
// XDSClient wraps the methods on the XDSClient which are required by
7070
// the listenerWrapper.
7171
type XDSClient interface {
72-
WatchListener(string, func(xdsresource.ListenerUpdate, error)) func()
73-
WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func()
72+
WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())
7473
BootstrapConfig() *bootstrap.Config
7574
}
7675

@@ -110,7 +109,6 @@ func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan stru
110109
mode: connectivity.ServingModeStarting,
111110
closed: grpcsync.NewEvent(),
112111
goodUpdate: grpcsync.NewEvent(),
113-
ldsUpdateCh: make(chan ldsUpdateWithError, 1),
114112
rdsUpdateCh: make(chan rdsHandlerUpdate, 1),
115113
}
116114
lw.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[xds-server-listener %p] ", lw))
@@ -120,17 +118,16 @@ func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan stru
120118
lisAddr := lw.Listener.Addr().String()
121119
lw.addr, lw.port, _ = net.SplitHostPort(lisAddr)
122120

123-
lw.rdsHandler = newRDSHandler(lw.xdsC, lw.rdsUpdateCh)
124-
lw.cancelWatch = lw.xdsC.WatchListener(lw.name, lw.handleListenerUpdate)
121+
lw.rdsHandler = newRDSHandler(lw.xdsC, lw.logger, lw.rdsUpdateCh)
122+
lw.cancelWatch = xdsresource.WatchListener(lw.xdsC, lw.name, &ldsWatcher{
123+
parent: lw,
124+
logger: lw.logger,
125+
name: lw.name,
126+
})
125127
go lw.run()
126128
return lw, lw.goodUpdate.Done()
127129
}
128130

129-
type ldsUpdateWithError struct {
130-
update xdsresource.ListenerUpdate
131-
err error
132-
}
133-
134131
// listenerWrapper wraps the net.Listener associated with the listening address
135132
// passed to Serve(). It also contains all other state associated with this
136133
// particular invocation of Serve().
@@ -181,8 +178,6 @@ type listenerWrapper struct {
181178
// rdsUpdates are the RDS resources received from the management
182179
// server, keyed on the RouteName of the RDS resource.
183180
rdsUpdates unsafe.Pointer // map[string]xdsclient.RouteConfigUpdate
184-
// ldsUpdateCh is a channel for XDSClient LDS updates.
185-
ldsUpdateCh chan ldsUpdateWithError
186181
// rdsUpdateCh is a channel for XDSClient RDS updates.
187182
rdsUpdateCh chan rdsHandlerUpdate
188183
}
@@ -320,31 +315,12 @@ func (l *listenerWrapper) run() {
320315
select {
321316
case <-l.closed.Done():
322317
return
323-
case u := <-l.ldsUpdateCh:
324-
l.handleLDSUpdate(u)
325318
case u := <-l.rdsUpdateCh:
326319
l.handleRDSUpdate(u)
327320
}
328321
}
329322
}
330323

331-
// handleLDSUpdate is the callback which handles LDS Updates. It writes the
332-
// received update to the update channel, which is picked up by the run
333-
// goroutine.
334-
func (l *listenerWrapper) handleListenerUpdate(update xdsresource.ListenerUpdate, err error) {
335-
if l.closed.HasFired() {
336-
l.logger.Warningf("Resource %q received update: %v with error: %v, after listener was closed", l.name, update, err)
337-
return
338-
}
339-
// Remove any existing entry in ldsUpdateCh and replace with the new one, as the only update
340-
// listener cares about is most recent update.
341-
select {
342-
case <-l.ldsUpdateCh:
343-
default:
344-
}
345-
l.ldsUpdateCh <- ldsUpdateWithError{update: update, err: err}
346-
}
347-
348324
// handleRDSUpdate handles a full rds update from rds handler. On a successful
349325
// update, the server will switch to ServingModeServing as the full
350326
// configuration (both LDS and RDS) has been received.
@@ -354,7 +330,6 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) {
354330
return
355331
}
356332
if update.err != nil {
357-
l.logger.Warningf("Received error for rds names specified in resource %q: %+v", l.name, update.err)
358333
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
359334
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
360335
}
@@ -368,17 +343,7 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) {
368343
l.goodUpdate.Fire()
369344
}
370345

371-
func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
372-
if update.err != nil {
373-
l.logger.Warningf("Received error for resource %q: %+v", l.name, update.err)
374-
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
375-
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
376-
}
377-
// For errors which are anything other than "resource-not-found", we
378-
// continue to use the old configuration.
379-
return
380-
}
381-
346+
func (l *listenerWrapper) handleLDSUpdate(update xdsresource.ListenerUpdate) {
382347
// Make sure that the socket address on the received Listener resource
383348
// matches the address of the net.Listener passed to us by the user. This
384349
// check is done here instead of at the XDSClient layer because of the
@@ -391,7 +356,7 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
391356
// What this means is that the XDSClient has ACKed a resource which can push
392357
// the server into a "not serving" mode. This is not ideal, but this is
393358
// what we have decided to do. See gRPC A36 for more details.
394-
ilc := update.update.InboundListenerCfg
359+
ilc := update.InboundListenerCfg
395360
if ilc.Address != l.addr || ilc.Port != l.port {
396361
l.switchMode(nil, connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
397362
return
@@ -440,3 +405,46 @@ func (l *listenerWrapper) switchMode(fcs *xdsresource.FilterChainManager, newMod
440405
l.modeCallback(l.Listener.Addr(), newMode, err)
441406
}
442407
}
408+
409+
// ldsWatcher implements the xdsresource.ListenerWatcher interface and is
410+
// passed to the WatchListener API.
411+
type ldsWatcher struct {
412+
parent *listenerWrapper
413+
logger *internalgrpclog.PrefixLogger
414+
name string
415+
}
416+
417+
func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
418+
if lw.parent.closed.HasFired() {
419+
lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update)
420+
return
421+
}
422+
if lw.logger.V(2) {
423+
lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource)
424+
}
425+
lw.parent.handleLDSUpdate(update.Resource)
426+
}
427+
428+
func (lw *ldsWatcher) OnError(err error) {
429+
if lw.parent.closed.HasFired() {
430+
lw.logger.Warningf("Resource %q received error: %v after listener was closed", lw.name, err)
431+
return
432+
}
433+
if lw.logger.V(2) {
434+
lw.logger.Infof("LDS watch for resource %q reported error: %#v", lw.name, err)
435+
}
436+
// For errors which are anything other than "resource-not-found", we
437+
// continue to use the old configuration.
438+
}
439+
440+
func (lw *ldsWatcher) OnResourceDoesNotExist() {
441+
if lw.parent.closed.HasFired() {
442+
lw.logger.Warningf("Resource %q received resource-not-found-error after listener was closed", lw.name)
443+
return
444+
}
445+
if lw.logger.V(2) {
446+
lw.logger.Infof("LDS watch for resource %q reported resource-does-not-exist error: %v", lw.name)
447+
}
448+
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", lw.name)
449+
lw.parent.switchMode(nil, connectivity.ServingModeNotServing, err)
450+
}

xds/internal/server/rds_handler.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package server
2121
import (
2222
"sync"
2323

24+
igrpclog "google.golang.org/grpc/internal/grpclog"
2425
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
2526
)
2627

@@ -34,7 +35,8 @@ type rdsHandlerUpdate struct {
3435
// rdsHandler handles any RDS queries that need to be started for a given server
3536
// side listeners Filter Chains (i.e. not inline).
3637
type rdsHandler struct {
37-
xdsC XDSClient
38+
xdsC XDSClient
39+
logger *igrpclog.PrefixLogger
3840

3941
mu sync.Mutex
4042
updates map[string]xdsresource.RouteConfigUpdate
@@ -49,9 +51,10 @@ type rdsHandler struct {
4951
// newRDSHandler creates a new rdsHandler to watch for RDS resources.
5052
// listenerWrapper updates the list of route names to watch by calling
5153
// updateRouteNamesToWatch() upon receipt of new Listener configuration.
52-
func newRDSHandler(xdsC XDSClient, ch chan rdsHandlerUpdate) *rdsHandler {
54+
func newRDSHandler(xdsC XDSClient, logger *igrpclog.PrefixLogger, ch chan rdsHandlerUpdate) *rdsHandler {
5355
return &rdsHandler{
5456
xdsC: xdsC,
57+
logger: logger,
5558
updateChannel: ch,
5659
updates: make(map[string]xdsresource.RouteConfigUpdate),
5760
cancels: make(map[string]func()),
@@ -69,11 +72,11 @@ func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool)
6972
// routeNamesToWatch.
7073
for routeName := range routeNamesToWatch {
7174
if _, ok := rh.cancels[routeName]; !ok {
72-
func(routeName string) {
73-
rh.cancels[routeName] = rh.xdsC.WatchRouteConfig(routeName, func(update xdsresource.RouteConfigUpdate, err error) {
74-
rh.handleRouteUpdate(routeName, update, err)
75-
})
76-
}(routeName)
75+
// The xDS client keeps a reference to the watcher until the cancel
76+
// func is invoked. So, we don't need to keep a reference for fear
77+
// of it being garbage collected.
78+
w := &rdsWatcher{parent: rh, routeName: routeName}
79+
rh.cancels[routeName] = xdsresource.WatchRouteConfig(rh.xdsC, routeName, w)
7780
}
7881
}
7982

@@ -97,11 +100,7 @@ func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool)
97100
// handleRouteUpdate persists the route config for a given route name, and also
98101
// sends an update to the Listener Wrapper on an error received or if the rds
99102
// handler has a full collection of updates.
100-
func (rh *rdsHandler) handleRouteUpdate(routeName string, update xdsresource.RouteConfigUpdate, err error) {
101-
if err != nil {
102-
drainAndPush(rh.updateChannel, rdsHandlerUpdate{err: err})
103-
return
104-
}
103+
func (rh *rdsHandler) handleRouteUpdate(routeName string, update xdsresource.RouteConfigUpdate) {
105104
rh.mu.Lock()
106105
defer rh.mu.Unlock()
107106
rh.updates[routeName] = update
@@ -131,3 +130,33 @@ func (rh *rdsHandler) close() {
131130
cancel()
132131
}
133132
}
133+
134+
// rdsWatcher implements the xdsresource.RouteConfigWatcher interface and is
135+
// passed to the WatchRouteConfig API.
136+
type rdsWatcher struct {
137+
parent *rdsHandler
138+
logger *igrpclog.PrefixLogger
139+
routeName string
140+
}
141+
142+
func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
143+
if rw.logger.V(2) {
144+
rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
145+
}
146+
rw.parent.handleRouteUpdate(rw.routeName, update.Resource)
147+
}
148+
149+
func (rw *rdsWatcher) OnError(err error) {
150+
if rw.logger.V(2) {
151+
rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err)
152+
}
153+
drainAndPush(rw.parent.updateChannel, rdsHandlerUpdate{err: err})
154+
}
155+
156+
func (rw *rdsWatcher) OnResourceDoesNotExist() {
157+
if rw.logger.V(2) {
158+
rw.logger.Infof("RDS watch for resource %q reported resource-does-not-exist error: %v", rw.routeName)
159+
}
160+
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", rw.routeName)
161+
drainAndPush(rw.parent.updateChannel, rdsHandlerUpdate{err: err})
162+
}

xds/internal/server/rds_handler_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func (s) TestRDSHandler_SuccessCaseOneRDSWatch(t *testing.T) {
187187

188188
// Create an rds handler and give it a single route to watch.
189189
updateCh := make(chan rdsHandlerUpdate, 1)
190-
rh := newRDSHandler(xdsC, updateCh)
190+
rh := newRDSHandler(xdsC, nil, updateCh)
191191
rh.updateRouteNamesToWatch(map[string]bool{route1: true})
192192

193193
// Verify that the given route is requested.
@@ -211,7 +211,7 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdates(t *testing.T) {
211211

212212
// Create an rds handler and give it a single route to watch.
213213
updateCh := make(chan rdsHandlerUpdate, 1)
214-
rh := newRDSHandler(xdsC, updateCh)
214+
rh := newRDSHandler(xdsC, nil, updateCh)
215215
rh.updateRouteNamesToWatch(map[string]bool{route1: true})
216216

217217
// Verify that the given route is requested.
@@ -273,7 +273,7 @@ func (s) TestRDSHandler_SuccessCaseDeletedRoute(t *testing.T) {
273273

274274
// Create an rds handler and give it two routes to watch.
275275
updateCh := make(chan rdsHandlerUpdate, 1)
276-
rh := newRDSHandler(xdsC, updateCh)
276+
rh := newRDSHandler(xdsC, nil, updateCh)
277277
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})
278278

279279
// Verify that the given routes are requested.
@@ -329,7 +329,7 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdatesAddAndDeleteRoute(t *testing.T) {
329329

330330
// Create an rds handler and give it two routes to watch.
331331
updateCh := make(chan rdsHandlerUpdate, 1)
332-
rh := newRDSHandler(xdsC, updateCh)
332+
rh := newRDSHandler(xdsC, nil, updateCh)
333333
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})
334334

335335
// Verify that the given routes are requested.
@@ -400,7 +400,7 @@ func (s) TestRDSHandler_SuccessCaseSecondUpdateMakesRouteFull(t *testing.T) {
400400

401401
// Create an rds handler and give it three routes to watch.
402402
updateCh := make(chan rdsHandlerUpdate, 1)
403-
rh := newRDSHandler(xdsC, updateCh)
403+
rh := newRDSHandler(xdsC, nil, updateCh)
404404
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true, route3: true})
405405

406406
// Verify that the given routes are requested.
@@ -455,7 +455,7 @@ func (s) TestErrorReceived(t *testing.T) {
455455

456456
// Create an rds handler and give it a single route to watch.
457457
updateCh := make(chan rdsHandlerUpdate, 1)
458-
rh := newRDSHandler(xdsC, updateCh)
458+
rh := newRDSHandler(xdsC, nil, updateCh)
459459
rh.updateRouteNamesToWatch(map[string]bool{route1: true})
460460

461461
// Verify that the given route is requested.

0 commit comments

Comments
 (0)