Skip to content

Commit 6e5cfc6

Browse files
[Sotw][Linear cache] Ensure watches are properly considering subscription changes to not miss new resources (#1301)
This PR leverages the new Subscription interface passed into the CreateWatch method to properly track the resources returned as well as the evolution of the watch subscription. This allows a proper handling of resource re-subscription as well as semantics needed for the new wildcard mode Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>
1 parent 19abeff commit 6e5cfc6

File tree

9 files changed

+871
-316
lines changed

9 files changed

+871
-316
lines changed

pkg/cache/v3/cache.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,17 @@ type DeltaRequest = discovery.DeltaDiscoveryRequest
4040
// Though the methods may return mutable parts of the state for performance reasons,
4141
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation.
4242
type Subscription interface {
43-
// ReturnedResources returns a list of resources that were sent to the client and their associated versions.
43+
// ReturnedResources returns the list of resources the client currently knows and their associated versions.
4444
// The versions are:
4545
// - delta protocol: version of the specific resource set in the response.
4646
// - sotw protocol: version of the global response when the resource was last sent.
47+
// The returned map must not be altered by the Cache.
4748
ReturnedResources() map[string]string
4849

4950
// SubscribedResources returns the list of resources currently subscribed to by the client for the type.
5051
// For delta it keeps track of subscription updates across requests
5152
// For sotw it is a normalized view of the last request resources
53+
// The returned map must not be altered by the Cache.
5254
SubscribedResources() map[string]struct{}
5355

5456
// IsWildcard returns whether the client has a wildcard watch.
@@ -118,6 +120,11 @@ type Response interface {
118120
// Get the version in the Response.
119121
GetVersion() (string, error)
120122

123+
// GetReturnedResources returns the map of resources and their versions returned in the subscription.
124+
// It may include more resources than directly set in the response to consider the full state of the client.
125+
// The caller is expected to provide this unchanged to the next call to CreateWatch as part of the subscription.
126+
GetReturnedResources() map[string]string
127+
121128
// Get the context provided during response creation.
122129
GetContext() context.Context
123130
}
@@ -155,6 +162,12 @@ type RawResponse struct {
155162
// Resources to be included in the response.
156163
Resources []types.ResourceWithTTL
157164

165+
// ReturnedResources tracks the resources returned for the subscription and the version when it was last returned,
166+
// including previously returned ones when using non-full state resources.
167+
// It allows the cache to know what the client knows. The server will transparently forward this
168+
// across requests, and the cache is responsible for its interpretation.
169+
ReturnedResources map[string]string
170+
158171
// Whether this is a heartbeat response. For xDS versions that support TTL, this
159172
// will be converted into a response that doesn't contain the actual resource protobuf.
160173
// This allows for more lightweight updates that server only to update the TTL timer.
@@ -207,6 +220,12 @@ type PassthroughResponse struct {
207220
DiscoveryResponse *discovery.DiscoveryResponse
208221

209222
ctx context.Context
223+
224+
// ReturnedResources tracks the resources returned for the subscription and the version when it was last returned,
225+
// including previously returned ones when using non-full state resources.
226+
// It allows the cache to know what the client knows. The server will transparently forward this
227+
// across requests, and the cache is responsible for its interpretation.
228+
ReturnedResources map[string]string
210229
}
211230

212231
// DeltaPassthroughResponse is a pre constructed xDS response that need not go through marshaling transformations.
@@ -264,6 +283,10 @@ func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, erro
264283
return marshaledResponse.(*discovery.DiscoveryResponse), nil
265284
}
266285

286+
func (r *RawResponse) GetReturnedResources() map[string]string {
287+
return r.ReturnedResources
288+
}
289+
267290
// GetDeltaDiscoveryResponse performs the marshaling the first time its called and uses the cached response subsequently.
268291
// We can do this because the marshaled response does not change across the calls.
269292
// This caching behavior is important in high throughput scenarios because grpc marshaling has a cost and it drives the cpu utilization under load.
@@ -367,6 +390,10 @@ func (r *PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryRespon
367390
return r.DiscoveryResponse, nil
368391
}
369392

393+
func (r *PassthroughResponse) GetReturnedResources() map[string]string {
394+
return r.ReturnedResources
395+
}
396+
370397
// GetDeltaDiscoveryResponse returns the final passthrough Delta Discovery Response.
371398
func (r *DeltaPassthroughResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error) {
372399
return r.DeltaDiscoveryResponse, nil

pkg/cache/v3/delta_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
1818
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
1919
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
20+
"github.com/envoyproxy/go-control-plane/pkg/log"
2021
rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
2122
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
2223
"github.com/envoyproxy/go-control-plane/pkg/test/resource/v3"
@@ -31,7 +32,7 @@ func assertResourceMapEqual(t *testing.T, want, got map[string]types.Resource) {
3132
}
3233

3334
func TestSnapshotCacheDeltaWatch(t *testing.T) {
34-
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
35+
c := cache.NewSnapshotCache(false, group{}, log.NewTestLogger(t))
3536
watches := make(map[string]chan cache.DeltaResponse)
3637
subscriptions := make(map[string]stream.Subscription)
3738

@@ -113,7 +114,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
113114
}
114115

115116
func TestDeltaRemoveResources(t *testing.T) {
116-
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
117+
c := cache.NewSnapshotCache(false, group{}, log.NewTestLogger(t))
117118
watches := make(map[string]chan cache.DeltaResponse)
118119
subscriptions := make(map[string]*stream.Subscription)
119120

@@ -188,7 +189,7 @@ func TestDeltaRemoveResources(t *testing.T) {
188189
}
189190

190191
func TestConcurrentSetDeltaWatch(t *testing.T) {
191-
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
192+
c := cache.NewSnapshotCache(false, group{}, log.NewTestLogger(t))
192193
for i := 0; i < 50; i++ {
193194
version := fmt.Sprintf("v%d", i)
194195
func(i int) {
@@ -222,7 +223,7 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
222223
type testKey struct{}
223224

224225
func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
225-
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
226+
c := cache.NewSnapshotCache(true, group{}, log.NewTestLogger(t))
226227

227228
// Create a non-buffered channel that will block sends.
228229
watchCh := make(chan cache.DeltaResponse)
@@ -267,7 +268,7 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {
267268
}
268269

269270
func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
270-
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
271+
c := cache.NewSnapshotCache(true, group{}, log.NewTestLogger(t))
271272
for _, typ := range testTypes {
272273
responses := make(chan cache.DeltaResponse, 1)
273274
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{

0 commit comments

Comments
 (0)