Skip to content

616 & 617 #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ type clusterCache struct {
// namespacedResources is a simple map which indicates a groupKind is namespaced
namespacedResources map[schema.GroupKind]bool

// maximum time we allow watches to run before relisting the group/kind and restarting the watch
// maximum time we allow watches to run before restarting them
watchResyncTimeout time.Duration
// sync retry timeout for cluster when sync error happens
clusterSyncRetryTimeout time.Duration
Expand Down Expand Up @@ -601,6 +601,10 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
}

listRetry.Steps = int(c.listRetryLimit)
// We set the resource version to 0 below to proactively prevent the
// list API call from reaching etcd and make the server fetch the data
// from the watch cache instead.
opts.ResourceVersion = "0"
err := retry.OnError(listRetry, c.listRetryFunc, func() error {
var ierr error
res, ierr = resClient.List(ctx, opts)
Expand Down Expand Up @@ -651,6 +655,7 @@ func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourc
}

func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) {
timeoutSeconds := int64(c.watchResyncTimeout.Seconds())
kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -668,6 +673,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo

w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.TimeoutSeconds = &timeoutSeconds
res, err := resClient.Watch(ctx, options)
if apierrors.IsNotFound(err) {
c.stopWatching(api.GroupKind, ns)
Expand All @@ -679,30 +685,21 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
return err
}

defer func() {
w.Stop()
resourceVersion = ""
}()

var watchResyncTimeoutCh <-chan time.Time
if c.watchResyncTimeout > 0 {
shouldResync := time.NewTimer(c.watchResyncTimeout)
defer shouldResync.Stop()
watchResyncTimeoutCh = shouldResync.C
}
defer w.Stop()

for {
select {
// stop watching when parent context got cancelled
case <-ctx.Done():
return nil

// re-synchronize API state and restart watch periodically
case <-watchResyncTimeoutCh:
return fmt.Errorf("resyncing %s on %s due to timeout", api.GroupKind, c.config.Host)

// re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version
case <-w.Done():
// The underlying retry watcher has stopped, possibly due to specifying an RV in
// the watch request that is stale (error code 410). This forces us to relist
// objects from the kube-apiserver to get a fresher RV and we invoke that relist
// by resetting the locally stored RV.
resourceVersion = ""
return fmt.Errorf("watch %s on %s has closed", api.GroupKind, c.config.Host)

case event, ok := <-w.ResultChan():
Expand All @@ -712,8 +709,16 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo

obj, ok := event.Object.(*unstructured.Unstructured)
if !ok {
// We failed to cast the object received in the watch event to something
// that contains a resource version field. Because of that, we don't know
// from what RV we should reinitialize the watch connection, so in order to
// avoid any inconsistencies due to accidental skipping of a potential RV,
// we reset the locally stored RV to forcefully invoke the list API call to
// get it from the kube-apiserver.
resourceVersion = ""
return fmt.Errorf("failed to convert to *unstructured.Unstructured: %v", event.Object)
}
resourceVersion = obj.GetResourceVersion()

c.recordEvent(event.Type, obj)
if kube.IsCRD(obj) {
Expand Down