@@ -29,6 +29,7 @@ import (
2929 "time"
3030
3131 "github.com/stretchr/testify/require"
32+ "google.golang.org/grpc/metadata"
3233
3334 apiequality "k8s.io/apimachinery/pkg/api/equality"
3435 apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -1864,3 +1865,128 @@ func TestForgetWatcher(t *testing.T) {
18641865 assertCacherInternalState (0 , 0 )
18651866 require .Equal (t , 2 , forgetCounter )
18661867}
1868+
1869+ func TestWatchStreamSeparation (t * testing.T ) {
1870+ tcs := []struct {
1871+ name string
1872+ separateCacheWatchRPC bool
1873+ useWatchCacheContextMetadata bool
1874+ expectBookmarkOnWatchCache bool
1875+ expectBookmarkOnEtcd bool
1876+ }{
1877+ {
1878+ name : "common RPC > both get bookmarks" ,
1879+ separateCacheWatchRPC : false ,
1880+ expectBookmarkOnEtcd : true ,
1881+ expectBookmarkOnWatchCache : true ,
1882+ },
1883+ {
1884+ name : "common RPC & watch cache context > both get bookmarks" ,
1885+ separateCacheWatchRPC : false ,
1886+ useWatchCacheContextMetadata : true ,
1887+ expectBookmarkOnEtcd : true ,
1888+ expectBookmarkOnWatchCache : true ,
1889+ },
1890+ {
1891+ name : "separate RPC > only etcd gets bookmarks" ,
1892+ separateCacheWatchRPC : true ,
1893+ expectBookmarkOnEtcd : true ,
1894+ expectBookmarkOnWatchCache : false ,
1895+ },
1896+ {
1897+ name : "separate RPC & watch cache context > only watch cache gets bookmarks" ,
1898+ separateCacheWatchRPC : true ,
1899+ useWatchCacheContextMetadata : true ,
1900+ expectBookmarkOnEtcd : false ,
1901+ expectBookmarkOnWatchCache : true ,
1902+ },
1903+ }
1904+ for _ , tc := range tcs {
1905+ t .Run (tc .name , func (t * testing.T ) {
1906+ defer featuregatetesting .SetFeatureGateDuringTest (t , utilfeature .DefaultFeatureGate , features .SeparateCacheWatchRPC , tc .separateCacheWatchRPC )()
1907+ _ , cacher , _ , terminate := testSetupWithEtcdServer (t )
1908+ t .Cleanup (terminate )
1909+ if err := cacher .ready .wait (context .TODO ()); err != nil {
1910+ t .Fatalf ("unexpected error waiting for the cache to be ready" )
1911+ }
1912+
1913+ getCacherRV := func () uint64 {
1914+ cacher .watchCache .RLock ()
1915+ defer cacher .watchCache .RUnlock ()
1916+ return cacher .watchCache .resourceVersion
1917+ }
1918+ waitContext , cancel := context .WithTimeout (context .Background (), 2 * time .Second )
1919+ defer cancel ()
1920+ waitForEtcdBookmark := watchAndWaitForBookmark (t , waitContext , cacher .storage )
1921+
1922+ var out example.Pod
1923+ err := cacher .Create (context .Background (), "foo" , & example.Pod {}, & out , 0 )
1924+ if err != nil {
1925+ t .Fatal (err )
1926+ }
1927+ versioner := storage.APIObjectVersioner {}
1928+ var lastResourceVersion uint64
1929+ lastResourceVersion , err = versioner .ObjectResourceVersion (& out )
1930+ if err != nil {
1931+ t .Fatal (err )
1932+ }
1933+
1934+ var contextMetadata metadata.MD
1935+ if tc .useWatchCacheContextMetadata {
1936+ contextMetadata = cacher .watchCache .waitingUntilFresh .contextMetadata
1937+ }
1938+ // Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507
1939+ // TODO(https://github.com/etcd-io/etcd/issues/17507): Remove sleep when etcd is upgraded to version with fix.
1940+ time .Sleep (time .Second )
1941+ err = cacher .storage .RequestWatchProgress (metadata .NewOutgoingContext (context .Background (), contextMetadata ))
1942+ if err != nil {
1943+ t .Fatal (err )
1944+ }
1945+ // Give time for bookmark to arrive
1946+ time .Sleep (time .Second )
1947+
1948+ etcdWatchResourceVersion := waitForEtcdBookmark ()
1949+ gotEtcdWatchBookmark := etcdWatchResourceVersion == lastResourceVersion
1950+ if gotEtcdWatchBookmark != tc .expectBookmarkOnEtcd {
1951+ t .Errorf ("Unexpected etcd bookmark check result, rv: %d, got: %v, want: %v" , etcdWatchResourceVersion , etcdWatchResourceVersion , tc .expectBookmarkOnEtcd )
1952+ }
1953+
1954+ watchCacheResourceVersion := getCacherRV ()
1955+ cacherGotBookmark := watchCacheResourceVersion == lastResourceVersion
1956+ if cacherGotBookmark != tc .expectBookmarkOnWatchCache {
1957+ t .Errorf ("Unexpected watch cache bookmark check result, rv: %d, got: %v, want: %v" , watchCacheResourceVersion , cacherGotBookmark , tc .expectBookmarkOnWatchCache )
1958+ }
1959+ })
1960+ }
1961+ }
1962+
1963+ func watchAndWaitForBookmark (t * testing.T , ctx context.Context , etcdStorage storage.Interface ) func () (resourceVersion uint64 ) {
1964+ opts := storage.ListOptions {ResourceVersion : "" , Predicate : storage .Everything , Recursive : true }
1965+ opts .Predicate .AllowWatchBookmarks = true
1966+ w , err := etcdStorage .Watch (ctx , "/pods/" , opts )
1967+ if err != nil {
1968+ t .Fatal (err )
1969+ }
1970+
1971+ versioner := storage.APIObjectVersioner {}
1972+ var rv uint64
1973+ var wg sync.WaitGroup
1974+ wg .Add (1 )
1975+ go func () {
1976+ defer wg .Done ()
1977+ for event := range w .ResultChan () {
1978+ if event .Type == watch .Bookmark {
1979+ rv , err = versioner .ObjectResourceVersion (event .Object )
1980+ break
1981+ }
1982+ }
1983+ }()
1984+ return func () (resourceVersion uint64 ) {
1985+ defer w .Stop ()
1986+ wg .Wait ()
1987+ if err != nil {
1988+ t .Fatal (err )
1989+ }
1990+ return rv
1991+ }
1992+ }
0 commit comments