Skip to content

Commit 55341d7

Browse files
authored
xdsclient: correct logic used to suppress empty ADS requests on new streams (#7026)
1 parent f7c5e6a commit 55341d7

File tree

2 files changed

+207
-20
lines changed

2 files changed

+207
-20
lines changed

xds/internal/xdsclient/transport/transport.go

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -363,29 +363,21 @@ func (t *Transport) send(ctx context.Context) {
363363
// The xDS protocol only requires that we send the node proto in the first
364364
// discovery request on every stream. Sending the node proto in every
365365
// request message wastes CPU resources on the client and the server.
366-
sendNodeProto := true
366+
sentNodeProto := false
367367
for {
368368
select {
369369
case <-ctx.Done():
370370
return
371371
case stream = <-t.adsStreamCh:
372372
// We have a new stream and we've to ensure that the node proto gets
373-
// sent out in the first request on the stream. At this point, we
374-
// might not have any registered watches. Setting this field to true
375-
// here will ensure that the node proto gets sent out along with the
376-
// discovery request when the first watch is registered.
377-
if len(t.resources) == 0 {
378-
sendNodeProto = true
379-
continue
380-
}
381-
382-
if !t.sendExisting(stream) {
373+
// sent out in the first request on the stream.
374+
var err error
375+
if sentNodeProto, err = t.sendExisting(stream); err != nil {
383376
// Send failed, clear the current stream. Attempt to resend will
384377
// only be made after a new stream is created.
385378
stream = nil
386379
continue
387380
}
388-
sendNodeProto = false
389381
case u, ok := <-t.adsRequestCh.Get():
390382
if !ok {
391383
// No requests will be sent after the adsRequestCh buffer is closed.
@@ -416,12 +408,12 @@ func (t *Transport) send(ctx context.Context) {
416408
// sending response back).
417409
continue
418410
}
419-
if err := t.sendAggregatedDiscoveryServiceRequest(stream, sendNodeProto, resources, url, version, nonce, nackErr); err != nil {
411+
if err := t.sendAggregatedDiscoveryServiceRequest(stream, !sentNodeProto, resources, url, version, nonce, nackErr); err != nil {
420412
t.logger.Warningf("Sending ADS request for resources: %q, url: %q, version: %q, nonce: %q failed: %v", resources, url, version, nonce, err)
421413
// Send failed, clear the current stream.
422414
stream = nil
423415
}
424-
sendNodeProto = false
416+
sentNodeProto = true
425417
}
426418
}
427419
}
@@ -433,7 +425,9 @@ func (t *Transport) send(ctx context.Context) {
433425
// that here because the stream has just started and Send() usually returns
434426
// quickly (once it pushes the message onto the transport layer) and is only
435427
// ever blocked if we don't have enough flow control quota.
436-
func (t *Transport) sendExisting(stream adsStream) bool {
428+
//
429+
// Returns true if the node proto was sent.
430+
func (t *Transport) sendExisting(stream adsStream) (sentNodeProto bool, err error) {
437431
t.mu.Lock()
438432
defer t.mu.Unlock()
439433

@@ -450,16 +444,18 @@ func (t *Transport) sendExisting(stream adsStream) bool {
450444
t.nonces = make(map[string]string)
451445

452446
// Send node proto only in the first request on the stream.
453-
sendNodeProto := true
454447
for url, resources := range t.resources {
455-
if err := t.sendAggregatedDiscoveryServiceRequest(stream, sendNodeProto, mapToSlice(resources), url, t.versions[url], "", nil); err != nil {
448+
if len(resources) == 0 {
449+
continue
450+
}
451+
if err := t.sendAggregatedDiscoveryServiceRequest(stream, !sentNodeProto, mapToSlice(resources), url, t.versions[url], "", nil); err != nil {
456452
t.logger.Warningf("Sending ADS request for resources: %q, url: %q, version: %q, nonce: %q failed: %v", resources, url, t.versions[url], "", err)
457-
return false
453+
return false, err
458454
}
459-
sendNodeProto = false
455+
sentNodeProto = true
460456
}
461457

462-
return true
458+
return sentNodeProto, nil
463459
}
464460

465461
// recv receives xDS responses on the provided ADS stream and branches out to

xds/internal/xdsclient/transport/transport_resource_test.go

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package transport_test
2121

2222
import (
2323
"context"
24+
"errors"
2425
"testing"
2526
"time"
2627

@@ -217,3 +218,193 @@ func (s) TestHandleResponseFromManagementServer(t *testing.T) {
217218
})
218219
}
219220
}
221+
222+
func (s) TestEmptyListenerResourceOnStreamRestart(t *testing.T) {
223+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
224+
defer cancel()
225+
226+
mgmtServer, cleanup := startFakeManagementServer(t)
227+
defer cleanup()
228+
t.Logf("Started xDS management server on %s", mgmtServer.Address)
229+
nodeProto := &v3corepb.Node{Id: uuid.New().String()}
230+
tr, err := transport.New(transport.Options{
231+
ServerCfg: *xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
232+
OnRecvHandler: func(update transport.ResourceUpdate) error {
233+
return nil
234+
},
235+
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling.
236+
OnErrorHandler: func(error) {}, // No stream error handling.
237+
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.
238+
NodeProto: nodeProto,
239+
})
240+
if err != nil {
241+
t.Fatalf("Failed to create xDS transport: %v", err)
242+
}
243+
defer tr.Close()
244+
245+
// Send a request for a listener resource.
246+
const resource = "some-resource"
247+
tr.SendRequest(version.V3ListenerURL, []string{resource})
248+
249+
// Ensure the proper request was sent.
250+
val, err := mgmtServer.XDSRequestChan.Receive(ctx)
251+
if err != nil {
252+
t.Fatalf("Error waiting for mgmt server response: %v", err)
253+
}
254+
wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
255+
Node: nodeProto,
256+
ResourceNames: []string{resource},
257+
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
258+
}}
259+
gotReq := val.(*fakeserver.Request)
260+
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
261+
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
262+
}
263+
264+
// Remove the subscription by requesting an empty list.
265+
tr.SendRequest(version.V3ListenerURL, []string{})
266+
267+
// Ensure the proper request was sent.
268+
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
269+
if err != nil {
270+
t.Fatalf("Error waiting for mgmt server response: %v", err)
271+
}
272+
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
273+
ResourceNames: []string{},
274+
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
275+
}}
276+
gotReq = val.(*fakeserver.Request)
277+
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
278+
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
279+
}
280+
281+
// Cause the stream to restart.
282+
mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")}
283+
284+
// Ensure no request is sent since there are no resources.
285+
ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout)
286+
defer cancel()
287+
if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) {
288+
t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got)
289+
}
290+
291+
tr.SendRequest(version.V3ListenerURL, []string{resource})
292+
293+
// Ensure the proper request was sent with the node proto.
294+
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
295+
if err != nil {
296+
t.Fatalf("Error waiting for mgmt server response: %v", err)
297+
}
298+
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
299+
Node: nodeProto,
300+
ResourceNames: []string{resource},
301+
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
302+
}}
303+
gotReq = val.(*fakeserver.Request)
304+
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
305+
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
306+
}
307+
308+
}
309+
310+
func (s) TestEmptyClusterResourceOnStreamRestartWithListener(t *testing.T) {
311+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
312+
defer cancel()
313+
314+
mgmtServer, cleanup := startFakeManagementServer(t)
315+
defer cleanup()
316+
t.Logf("Started xDS management server on %s", mgmtServer.Address)
317+
nodeProto := &v3corepb.Node{Id: uuid.New().String()}
318+
tr, err := transport.New(transport.Options{
319+
ServerCfg: *xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
320+
OnRecvHandler: func(update transport.ResourceUpdate) error {
321+
return nil
322+
},
323+
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling.
324+
OnErrorHandler: func(error) {}, // No stream error handling.
325+
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.
326+
NodeProto: nodeProto,
327+
})
328+
if err != nil {
329+
t.Fatalf("Failed to create xDS transport: %v", err)
330+
}
331+
defer tr.Close()
332+
333+
// Send a request for a listener resource.
334+
const resource = "some-resource"
335+
tr.SendRequest(version.V3ListenerURL, []string{resource})
336+
337+
// Ensure the proper request was sent.
338+
val, err := mgmtServer.XDSRequestChan.Receive(ctx)
339+
if err != nil {
340+
t.Fatalf("Error waiting for mgmt server response: %v", err)
341+
}
342+
wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
343+
Node: nodeProto,
344+
ResourceNames: []string{resource},
345+
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
346+
}}
347+
gotReq := val.(*fakeserver.Request)
348+
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
349+
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
350+
}
351+
352+
// Send a request for a cluster resource.
353+
tr.SendRequest(version.V3ClusterURL, []string{resource})
354+
355+
// Ensure the proper request was sent.
356+
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
357+
if err != nil {
358+
t.Fatalf("Error waiting for mgmt server response: %v", err)
359+
}
360+
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
361+
ResourceNames: []string{resource},
362+
TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
363+
}}
364+
gotReq = val.(*fakeserver.Request)
365+
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
366+
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
367+
}
368+
369+
// Remove the cluster subscription by requesting an empty list.
370+
tr.SendRequest(version.V3ClusterURL, []string{})
371+
372+
// Ensure the proper request was sent.
373+
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
374+
if err != nil {
375+
t.Fatalf("Error waiting for mgmt server response: %v", err)
376+
}
377+
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
378+
ResourceNames: []string{},
379+
TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
380+
}}
381+
gotReq = val.(*fakeserver.Request)
382+
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
383+
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
384+
}
385+
386+
// Cause the stream to restart.
387+
mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")}
388+
389+
// Ensure the proper LDS request was sent.
390+
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
391+
if err != nil {
392+
t.Fatalf("Error waiting for mgmt server response: %v", err)
393+
}
394+
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
395+
Node: nodeProto,
396+
ResourceNames: []string{resource},
397+
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
398+
}}
399+
gotReq = val.(*fakeserver.Request)
400+
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
401+
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
402+
}
403+
404+
// Ensure no cluster request is sent since there are no cluster resources.
405+
ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout)
406+
defer cancel()
407+
if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) {
408+
t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got)
409+
}
410+
}

0 commit comments

Comments
 (0)