Skip to content

Commit 749cc0c

Browse files
authored
Merge pull request #1151 from timebertt/fix/dynamic-rest-mapper
⚠️ DynamicRestMapper: return NoMatchError when resource doesn't exist
2 parents dba75e5 + b8e24db commit 749cc0c

File tree

2 files changed

+57
-76
lines changed

2 files changed

+57
-76
lines changed

pkg/client/apiutil/dynamicrestmapper.go

Lines changed: 10 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package apiutil
1919
import (
2020
"errors"
2121
"sync"
22-
"time"
2322

2423
"golang.org/x/time/rate"
2524
"k8s.io/apimachinery/pkg/api/meta"
@@ -29,34 +28,12 @@ import (
2928
"k8s.io/client-go/restmapper"
3029
)
3130

32-
// ErrRateLimited is returned by a RESTMapper method if the number of API
33-
// calls has exceeded a limit within a certain time period.
34-
type ErrRateLimited struct {
35-
// Duration to wait until the next API call can be made.
36-
Delay time.Duration
37-
}
38-
39-
func (e ErrRateLimited) Error() string {
40-
return "too many API calls to the RESTMapper within a timeframe"
41-
}
42-
43-
// DelayIfRateLimited returns the delay time until the next API call is
44-
// allowed and true if err is of type ErrRateLimited. The zero
45-
// time.Duration value and false are returned if err is not a ErrRateLimited.
46-
func DelayIfRateLimited(err error) (time.Duration, bool) {
47-
var rlerr ErrRateLimited
48-
if errors.As(err, &rlerr) {
49-
return rlerr.Delay, true
50-
}
51-
return 0, false
52-
}
53-
5431
// dynamicRESTMapper is a RESTMapper that dynamically discovers resource
5532
// types at runtime.
5633
type dynamicRESTMapper struct {
5734
mu sync.RWMutex // protects the following fields
5835
staticMapper meta.RESTMapper
59-
limiter *dynamicLimiter
36+
limiter *rate.Limiter
6037
newMapper func() (meta.RESTMapper, error)
6138

6239
lazy bool
@@ -70,7 +47,7 @@ type DynamicRESTMapperOption func(*dynamicRESTMapper) error
7047
// WithLimiter sets the RESTMapper's underlying limiter to lim.
7148
func WithLimiter(lim *rate.Limiter) DynamicRESTMapperOption {
7249
return func(drm *dynamicRESTMapper) error {
73-
drm.limiter = &dynamicLimiter{lim}
50+
drm.limiter = lim
7451
return nil
7552
}
7653
}
@@ -103,9 +80,7 @@ func NewDynamicRESTMapper(cfg *rest.Config, opts ...DynamicRESTMapperOption) (me
10380
return nil, err
10481
}
10582
drm := &dynamicRESTMapper{
106-
limiter: &dynamicLimiter{
107-
rate.NewLimiter(rate.Limit(defaultRefillRate), defaultLimitSize),
108-
},
83+
limiter: rate.NewLimiter(rate.Limit(defaultRefillRate), defaultLimitSize),
10984
newMapper: func() (meta.RESTMapper, error) {
11085
groupResources, err := restmapper.GetAPIGroupResources(client)
11186
if err != nil {
@@ -161,12 +136,13 @@ func (drm *dynamicRESTMapper) init() (err error) {
161136
// checkAndReload attempts to call the given callback, which is assumed to be dependent
162137
// on the data in the restmapper.
163138
//
164-
// If the callback returns a NoKindMatchError, it will attempt to reload
139+
// If the callback returns an error that matches the given error, it will attempt to reload
165140
// the RESTMapper's data and re-call the callback once that's occurred.
166141
// If the callback returns any other error, the function will return immediately regardless.
167142
//
168-
// It will take care
169-
// ensuring that reloads are rate-limitted and that extraneous calls aren't made.
143+
// It will take care of ensuring that reloads are rate-limited and that extraneous calls
144+
// aren't made. If a reload would exceed the limiters rate, it returns the error return by
145+
// the callback.
170146
// It's thread-safe, and worries about thread-safety for the callback (so the callback does
171147
// not need to attempt to lock the restmapper).
172148
func (drm *dynamicRESTMapper) checkAndReload(needsReloadErr error, checkNeedsReload func() error) error {
@@ -199,7 +175,9 @@ func (drm *dynamicRESTMapper) checkAndReload(needsReloadErr error, checkNeedsRel
199175
}
200176

201177
// we're still stale, so grab a rate-limit token if we can...
202-
if err := drm.limiter.checkRate(); err != nil {
178+
if !drm.limiter.Allow() {
179+
// return error from static mapper here, we have refreshed often enough (exceeding rate of provided limiter)
180+
// so that client's can handle this the same way as a "normal" NoResourceMatchError / NoKindMatchError
203181
return err
204182
}
205183

@@ -305,19 +283,3 @@ func (drm *dynamicRESTMapper) ResourceSingularizer(resource string) (string, err
305283
})
306284
return singular, err
307285
}
308-
309-
// dynamicLimiter holds a rate limiter used to throttle chatty RESTMapper users.
310-
type dynamicLimiter struct {
311-
*rate.Limiter
312-
}
313-
314-
// checkRate returns an ErrRateLimited if too many API calls have been made
315-
// within the set limit.
316-
func (b *dynamicLimiter) checkRate() error {
317-
res := b.Reserve()
318-
if res.Delay() == 0 {
319-
return nil
320-
}
321-
res.Cancel()
322-
return ErrRateLimited{res.Delay()}
323-
}

pkg/client/apiutil/dynamicrestmapper_test.go

Lines changed: 47 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package apiutil_test
22

33
import (
4-
"errors"
54
"time"
65

76
. "github.com/onsi/ginkgo"
87
. "github.com/onsi/gomega"
8+
"github.com/onsi/gomega/format"
9+
"github.com/onsi/gomega/types"
910
"golang.org/x/time/rate"
1011
"k8s.io/apimachinery/pkg/api/meta"
1112
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -57,53 +58,49 @@ var _ = Describe("Dynamic REST Mapper", func() {
5758
})
5859

5960
It("should reload if not present in the cache", func() {
60-
By("reading successfully once")
61+
By("reading target successfully once")
6162
Expect(callWithTarget()).To(Succeed())
62-
Expect(callWithOther()).NotTo(Succeed())
6363

64-
By("asking for a something that didn't exist previously after adding it to the mapper")
64+
By("reading other not successfully")
65+
count := 0
6566
addToMapper = func(baseMapper *meta.DefaultRESTMapper) {
67+
count++
6668
baseMapper.Add(targetGVK, meta.RESTScopeNamespace)
67-
baseMapper.Add(secondGVK, meta.RESTScopeNamespace)
6869
}
69-
Expect(callWithOther()).To(Succeed())
70-
Expect(callWithTarget()).To(Succeed())
71-
})
70+
Expect(callWithOther()).To(beNoMatchError())
71+
Expect(count).To(Equal(1), "should reload exactly once")
7272

73-
It("should rate-limit reloads so that we don't get more than a certain number per second", func() {
74-
By("setting a small limit")
75-
*lim = *rate.NewLimiter(rate.Limit(1), 1)
76-
77-
By("forcing a reload after changing the mapper")
73+
By("reading both successfully now")
7874
addToMapper = func(baseMapper *meta.DefaultRESTMapper) {
75+
baseMapper.Add(targetGVK, meta.RESTScopeNamespace)
7976
baseMapper.Add(secondGVK, meta.RESTScopeNamespace)
8077
}
8178
Expect(callWithOther()).To(Succeed())
82-
83-
By("calling another time that would need a requery and failing")
84-
Eventually(func() bool {
85-
return errors.As(callWithTarget(), &apiutil.ErrRateLimited{})
86-
}, "10s").Should(BeTrue())
79+
Expect(callWithTarget()).To(Succeed())
8780
})
8881

89-
It("should rate-limit then allow more at 1rps", func() {
82+
It("should rate-limit then allow more at configured rate", func() {
9083
By("setting a small limit")
91-
*lim = *rate.NewLimiter(rate.Limit(1), 1)
84+
*lim = *rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
9285

9386
By("forcing a reload after changing the mapper")
9487
addToMapper = func(baseMapper *meta.DefaultRESTMapper) {
9588
baseMapper.Add(secondGVK, meta.RESTScopeNamespace)
9689
}
97-
98-
By("calling twice to trigger rate limiting")
9990
Expect(callWithOther()).To(Succeed())
100-
Expect(callWithTarget()).NotTo(Succeed())
10191

102-
// by 2nd call loop should succeed because we canceled our 1st rate-limited token, then waited a full second
103-
By("calling until no longer rate-limited, 2nd call should succeed")
104-
Eventually(func() bool {
105-
return errors.As(callWithTarget(), &apiutil.ErrRateLimited{})
106-
}, "2.5s", "1s").Should(BeFalse())
92+
By("calling another time to trigger rate limiting")
93+
addToMapper = func(baseMapper *meta.DefaultRESTMapper) {
94+
baseMapper.Add(targetGVK, meta.RESTScopeNamespace)
95+
}
96+
// if call consistently fails, we are sure, that it was rate-limited,
97+
// otherwise it would have reloaded and succeeded
98+
Consistently(callWithTarget, "90ms", "10ms").Should(beNoMatchError())
99+
100+
By("calling until no longer rate-limited")
101+
// once call succeeds, we are sure, that it was no longer rate-limited,
102+
// as it was allowed to reload and found matching kind/resource
103+
Eventually(callWithTarget, "30ms", "10ms").Should(And(Succeed(), Not(beNoMatchError())))
107104
})
108105

109106
It("should avoid reloading twice if two requests for the same thing come in", func() {
@@ -251,3 +248,25 @@ var _ = Describe("Dynamic REST Mapper", func() {
251248
})
252249
})
253250
})
251+
252+
func beNoMatchError() types.GomegaMatcher {
253+
return noMatchErrorMatcher{}
254+
}
255+
256+
type noMatchErrorMatcher struct{}
257+
258+
func (k noMatchErrorMatcher) Match(actual interface{}) (success bool, err error) {
259+
actualErr, actualOk := actual.(error)
260+
if !actualOk {
261+
return false, nil
262+
}
263+
264+
return meta.IsNoMatchError(actualErr), nil
265+
}
266+
267+
func (k noMatchErrorMatcher) FailureMessage(actual interface{}) (message string) {
268+
return format.Message(actual, "to be a NoMatchError")
269+
}
270+
func (k noMatchErrorMatcher) NegatedFailureMessage(actual interface{}) (message string) {
271+
return format.Message(actual, "not to be a NoMatchError")
272+
}

0 commit comments

Comments
 (0)