From bfb7d34cb9422e3b6ac20d2ea4b3c16c4fd93de4 Mon Sep 17 00:00:00 2001 From: Andrew Harding Date: Fri, 4 Oct 2024 12:45:14 -0600 Subject: [PATCH 1/2] Add custom backoff strategy option (#302) Signed-off-by: Andrew Harding --- v2/workloadapi/backoff.go | 47 ++++++++++++++++++++++++--------- v2/workloadapi/backoff_test.go | 23 ++++++---------- v2/workloadapi/client.go | 19 +++++++------- v2/workloadapi/client_test.go | 48 +++++++++++++++++++++++++++++++--- v2/workloadapi/option.go | 17 +++++++++--- 5 files changed, 110 insertions(+), 44 deletions(-) diff --git a/v2/workloadapi/backoff.go b/v2/workloadapi/backoff.go index b6ef1ed5..5ff91261 100644 --- a/v2/workloadapi/backoff.go +++ b/v2/workloadapi/backoff.go @@ -5,30 +5,51 @@ import ( "time" ) -// backoff defines an linear backoff policy. -type backoff struct { - InitialDelay time.Duration - MaxDelay time.Duration +// BackoffStrategy provides backoff facilities. +type BackoffStrategy interface { + // NewBackoff returns a new backoff for the strategy. The returned + // Backoff is in the same state that it would be in after a call to + // Reset(). + NewBackoff() Backoff +} + +// Backoff provides backoff for a workload API operation. +type Backoff interface { + // Next returns the next backoff period. + Next() time.Duration + + // Reset() resets the backoff. + Reset() +} + +type defaultBackoffStrategy struct{} + +func (defaultBackoffStrategy) NewBackoff() Backoff { + return newLinearBackoff() +} + +// linearBackoff defines an linear backoff policy. +type linearBackoff struct { + initialDelay time.Duration + maxDelay time.Duration n int } -func newBackoff() *backoff { - return &backoff{ - InitialDelay: time.Second, - MaxDelay: 30 * time.Second, +func newLinearBackoff() *linearBackoff { + return &linearBackoff{ + initialDelay: time.Second, + maxDelay: 30 * time.Second, n: 0, } } -// Duration returns the next wait period for the backoff. Not goroutine-safe. -func (b *backoff) Duration() time.Duration { +func (b *linearBackoff) Next() time.Duration { backoff := float64(b.n) + 1 - d := math.Min(b.InitialDelay.Seconds()*backoff, b.MaxDelay.Seconds()) + d := math.Min(b.initialDelay.Seconds()*backoff, b.maxDelay.Seconds()) b.n++ return time.Duration(d) * time.Second } -// Reset resets the backoff's state. -func (b *backoff) Reset() { +func (b *linearBackoff) Reset() { b.n = 0 } diff --git a/v2/workloadapi/backoff_test.go b/v2/workloadapi/backoff_test.go index 9e25e323..e9b4c763 100644 --- a/v2/workloadapi/backoff_test.go +++ b/v2/workloadapi/backoff_test.go @@ -7,34 +7,27 @@ import ( "github.com/stretchr/testify/require" ) -func TestBackoff(t *testing.T) { - new := func() *backoff { //nolint:all - b := newBackoff() - b.InitialDelay = time.Second - b.MaxDelay = 30 * time.Second - return b - } - - testUntilMax := func(t *testing.T, b *backoff) { +func TestLinearBackoff(t *testing.T) { + testUntilMax := func(t *testing.T, b *linearBackoff) { for i := 1; i < 30; i++ { - require.Equal(t, time.Duration(i)*time.Second, b.Duration()) + require.Equal(t, time.Duration(i)*time.Second, b.Next()) } - require.Equal(t, 30*time.Second, b.Duration()) - require.Equal(t, 30*time.Second, b.Duration()) - require.Equal(t, 30*time.Second, b.Duration()) + require.Equal(t, 30*time.Second, b.Next()) + require.Equal(t, 30*time.Second, b.Next()) + require.Equal(t, 30*time.Second, b.Next()) } t.Run("test max", func(t *testing.T) { t.Parallel() - b := new() + b := newLinearBackoff() testUntilMax(t, b) }) t.Run("test reset", func(t *testing.T) { t.Parallel() - b := new() + b := newLinearBackoff() testUntilMax(t, b) b.Reset() diff --git a/v2/workloadapi/client.go b/v2/workloadapi/client.go index 4d5de5d5..7739798b 100644 --- a/v2/workloadapi/client.go +++ b/v2/workloadapi/client.go @@ -119,7 +119,7 @@ func (c *Client) FetchX509Bundles(ctx context.Context) (*x509bundle.Set, error) // WatchX509Bundles watches for changes to the X.509 bundles. The watcher receives // the updated X.509 bundles. func (c *Client) WatchX509Bundles(ctx context.Context, watcher X509BundleWatcher) error { - backoff := newBackoff() + backoff := c.config.backoffStrategy.NewBackoff() for { err := c.watchX509Bundles(ctx, watcher, backoff) watcher.OnX509BundlesWatchError(err) @@ -152,7 +152,7 @@ func (c *Client) FetchX509Context(ctx context.Context) (*X509Context, error) { // WatchX509Context watches for updates to the X.509 context. The watcher // receives the updated X.509 context. func (c *Client) WatchX509Context(ctx context.Context, watcher X509ContextWatcher) error { - backoff := newBackoff() + backoff := c.config.backoffStrategy.NewBackoff() for { err := c.watchX509Context(ctx, watcher, backoff) watcher.OnX509ContextWatchError(err) @@ -224,7 +224,7 @@ func (c *Client) FetchJWTBundles(ctx context.Context) (*jwtbundle.Set, error) { // WatchJWTBundles watches for changes to the JWT bundles. The watcher receives // the updated JWT bundles. func (c *Client) WatchJWTBundles(ctx context.Context, watcher JWTBundleWatcher) error { - backoff := newBackoff() + backoff := c.config.backoffStrategy.NewBackoff() for { err := c.watchJWTBundles(ctx, watcher, backoff) watcher.OnJWTBundlesWatchError(err) @@ -258,7 +258,7 @@ func (c *Client) newConn(ctx context.Context) (*grpc.ClientConn, error) { return grpc.DialContext(ctx, c.config.address, c.config.dialOptions...) //nolint:staticcheck // preserve backcompat with WithDialOptions option } -func (c *Client) handleWatchError(ctx context.Context, err error, backoff *backoff) error { +func (c *Client) handleWatchError(ctx context.Context, err error, backoff Backoff) error { code := status.Code(err) if code == codes.Canceled { return err @@ -270,7 +270,7 @@ func (c *Client) handleWatchError(ctx context.Context, err error, backoff *backo } c.config.log.Errorf("Failed to watch the Workload API: %v", err) - retryAfter := backoff.Duration() + retryAfter := backoff.Next() c.config.log.Debugf("Retrying watch in %s", retryAfter) select { case <-time.After(retryAfter): @@ -281,7 +281,7 @@ func (c *Client) handleWatchError(ctx context.Context, err error, backoff *backo } } -func (c *Client) watchX509Context(ctx context.Context, watcher X509ContextWatcher, backoff *backoff) error { +func (c *Client) watchX509Context(ctx context.Context, watcher X509ContextWatcher, backoff Backoff) error { ctx, cancel := context.WithCancel(withHeader(ctx)) defer cancel() @@ -308,7 +308,7 @@ func (c *Client) watchX509Context(ctx context.Context, watcher X509ContextWatche } } -func (c *Client) watchJWTBundles(ctx context.Context, watcher JWTBundleWatcher, backoff *backoff) error { +func (c *Client) watchJWTBundles(ctx context.Context, watcher JWTBundleWatcher, backoff Backoff) error { ctx, cancel := context.WithCancel(withHeader(ctx)) defer cancel() @@ -335,7 +335,7 @@ func (c *Client) watchJWTBundles(ctx context.Context, watcher JWTBundleWatcher, } } -func (c *Client) watchX509Bundles(ctx context.Context, watcher X509BundleWatcher, backoff *backoff) error { +func (c *Client) watchX509Bundles(ctx context.Context, watcher X509BundleWatcher, backoff Backoff) error { ctx, cancel := context.WithCancel(withHeader(ctx)) defer cancel() @@ -402,7 +402,8 @@ func withHeader(ctx context.Context) context.Context { func defaultClientConfig() clientConfig { return clientConfig{ - log: logger.Null, + log: logger.Null, + backoffStrategy: defaultBackoffStrategy{}, } } diff --git a/v2/workloadapi/client_test.go b/v2/workloadapi/client_test.go index 5040d8fb..aa2653dc 100644 --- a/v2/workloadapi/client_test.go +++ b/v2/workloadapi/client_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/x509" "sync" + "sync/atomic" "testing" "time" @@ -103,7 +104,10 @@ func TestFetchX509Bundles(t *testing.T) { func TestWatchX509Bundles(t *testing.T) { wl := fakeworkloadapi.New(t) defer wl.Stop() - c, err := New(context.Background(), WithAddr(wl.Addr())) + + backoffStrategy := &testBackoffStrategy{} + + c, err := New(context.Background(), WithAddr(wl.Addr()), WithBackoffStrategy(backoffStrategy)) require.NoError(t, err) defer c.Close() @@ -149,6 +153,9 @@ func TestWatchX509Bundles(t *testing.T) { wl.Stop() tw.WaitForUpdates(1) assert.Len(t, tw.Errors(), 2) + + // Assert that there was the expected number of backoffs. + assert.Equal(t, 2, backoffStrategy.BackedOff()) } func TestFetchX509Context(t *testing.T) { @@ -213,7 +220,10 @@ func TestWatchX509Context(t *testing.T) { federatedCA := test.NewCA(t, federatedTD) wl := fakeworkloadapi.New(t) defer wl.Stop() - c, err := New(context.Background(), WithAddr(wl.Addr())) + + backoffStrategy := &testBackoffStrategy{} + + c, err := New(context.Background(), WithAddr(wl.Addr()), WithBackoffStrategy(backoffStrategy)) require.NoError(t, err) defer c.Close() @@ -291,6 +301,9 @@ func TestWatchX509Context(t *testing.T) { cancel() wg.Wait() + + // Assert that there was the expected number of backoffs. + assert.Equal(t, 2, backoffStrategy.BackedOff()) } func TestFetchJWTSVID(t *testing.T) { @@ -375,7 +388,10 @@ func TestFetchJWTBundles(t *testing.T) { func TestWatchJWTBundles(t *testing.T) { wl := fakeworkloadapi.New(t) defer wl.Stop() - c, err := New(context.Background(), WithAddr(wl.Addr())) + + backoffStrategy := &testBackoffStrategy{} + + c, err := New(context.Background(), WithAddr(wl.Addr()), WithBackoffStrategy(backoffStrategy)) require.NoError(t, err) defer c.Close() @@ -421,6 +437,9 @@ func TestWatchJWTBundles(t *testing.T) { wl.Stop() tw.WaitForUpdates(1) assert.Len(t, tw.Errors(), 2) + + // Assert that there was the expected number of backoffs. + assert.Equal(t, 2, backoffStrategy.BackedOff()) } func TestValidateJWTSVID(t *testing.T) { @@ -605,3 +624,26 @@ func (w *testWatcher) WaitForUpdates(expectedNumUpdates int) { } } } + +type testBackoffStrategy struct { + backedOff int32 +} + +func (s *testBackoffStrategy) NewBackoff() Backoff { + return testBackoff{backedOff: &s.backedOff} +} + +func (s *testBackoffStrategy) BackedOff() int { + return int(atomic.LoadInt32(&s.backedOff)) +} + +type testBackoff struct { + backedOff *int32 +} + +func (b testBackoff) Next() time.Duration { + atomic.AddInt32(b.backedOff, 1) + return time.Millisecond * 200 +} + +func (b testBackoff) Reset() {} diff --git a/v2/workloadapi/option.go b/v2/workloadapi/option.go index 00cab7d1..4997b8b7 100644 --- a/v2/workloadapi/option.go +++ b/v2/workloadapi/option.go @@ -35,6 +35,14 @@ func WithLogger(logger logger.Logger) ClientOption { }) } +// WithBackoff provides a custom backoff strategy that replaces the +// default backoff strategy (linear backoff). +func WithBackoffStrategy(backoffStrategy BackoffStrategy) ClientOption { + return clientOption(func(c *clientConfig) { + c.backoffStrategy = backoffStrategy + }) +} + // SourceOption are options that are shared among all option types. type SourceOption interface { configureX509Source(*x509SourceConfig) @@ -81,10 +89,11 @@ type BundleSourceOption interface { } type clientConfig struct { - address string - namedPipeName string - dialOptions []grpc.DialOption - log logger.Logger + address string + namedPipeName string + dialOptions []grpc.DialOption + log logger.Logger + backoffStrategy BackoffStrategy } type clientOption func(*clientConfig) From 1b87745e6f69392f84393e0f4325bc62b475acdc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 4 Oct 2024 12:57:47 -0600 Subject: [PATCH 2/2] Bump google.golang.org/grpc from 1.64.0 to 1.67.1 in /v2 (#303) Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.64.0 to 1.67.1. - [Release notes](https://github.com/grpc/grpc-go/releases) - [Commits](https://github.com/grpc/grpc-go/compare/v1.64.0...v1.67.1) --- updated-dependencies: - dependency-name: google.golang.org/grpc dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- v2/go.mod | 12 ++++++------ v2/go.sum | 24 ++++++++++++------------ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/v2/go.mod b/v2/go.mod index 7374e486..1aaa679e 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -7,7 +7,7 @@ require ( github.com/go-jose/go-jose/v4 v4.0.4 github.com/stretchr/testify v1.9.0 github.com/zeebo/errs v1.3.0 - google.golang.org/grpc v1.64.0 + google.golang.org/grpc v1.67.1 google.golang.org/grpc/examples v0.0.0-20230224211313-3775f633ce20 google.golang.org/protobuf v1.34.2 ) @@ -16,11 +16,11 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/kr/pretty v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - golang.org/x/crypto v0.25.0 // indirect - golang.org/x/net v0.23.0 // indirect - golang.org/x/sys v0.22.0 // indirect - golang.org/x/text v0.16.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/v2/go.sum b/v2/go.sum index e8aec7c2..b44aac25 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -17,18 +17,18 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs= github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= -golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= -golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= -golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= -golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= -google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= -google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= google.golang.org/grpc/examples v0.0.0-20230224211313-3775f633ce20 h1:MLBCGN1O7GzIx+cBiwfYPwtmZ41U3Mn/cotLJciaArI= google.golang.org/grpc/examples v0.0.0-20230224211313-3775f633ce20/go.mod h1:Nr5H8+MlGWr5+xX/STzdoEqJrO+YteqFbMyCsrb6mH0= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=