Skip to content

Commit

Permalink
GOCBC-534: Add retry API
Browse files Browse the repository at this point in the history
Motivation
----------
The SDK should retry requests under some circumstances. It should
also provide the user with a way to customize this behavior.

Changes
-------
Added the retry API. Added RetryStrategy to all operations.
Updated http service based operations to attempt retries based
on responses. Updated timeouts across the board to contain more
information about retries.

Change-Id: I5dabb488abbdc161258fefeaa24391407d34a1ea
Reviewed-on: http://review.couchbase.org/116847
Reviewed-by: Brett Lawson <brett19@gmail.com>
Tested-by: Charles Dixon <chvckd@gmail.com>
  • Loading branch information
chvck committed Oct 29, 2019
1 parent 694c268 commit db833d3
Show file tree
Hide file tree
Showing 35 changed files with 2,222 additions and 904 deletions.
5 changes: 3 additions & 2 deletions analyticsquery_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ type AnalyticsOptions struct {

// JSONSerializer is used to deserialize each row in the result. This should be a JSON deserializer as results are JSON.
// NOTE: if not set then query will always default to DefaultJSONSerializer.
Serializer JSONSerializer
Serializer JSONSerializer
RetryStrategy RetryStrategy
}

func (opts *AnalyticsOptions) toMap(statement string) (map[string]interface{}, error) {
Expand All @@ -46,7 +47,7 @@ func (opts *AnalyticsOptions) toMap(statement string) (map[string]interface{}, e
}

if opts.ClientContextID == "" {
execOpts["client_context_id"] = uuid.New()
execOpts["client_context_id"] = uuid.New().String()
} else {
execOpts["client_context_id"] = opts.ClientContextID
}
Expand Down
16 changes: 10 additions & 6 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func newBucket(sb *stateBlock, bucketName string, opts BucketOptions) *Bucket {

Transcoder: sb.Transcoder,
Serializer: sb.Serializer,

RetryStrategyWrapper: sb.RetryStrategyWrapper,
},
}
}
Expand Down Expand Up @@ -85,9 +87,10 @@ func (b *Bucket) ViewIndexes() (*ViewIndexManager, error) {
}

return &ViewIndexManager{
bucketName: b.Name(),
httpClient: provider,
globalTimeout: b.sb.ManagementTimeout,
bucketName: b.Name(),
httpClient: provider,
globalTimeout: b.sb.ManagementTimeout,
defaultRetryStrategy: b.sb.RetryStrategyWrapper,
}, nil
}

Expand All @@ -100,8 +103,9 @@ func (b *Bucket) CollectionManager() (*CollectionManager, error) {
}

return &CollectionManager{
httpClient: provider,
bucketName: b.Name(),
globalTimeout: b.sb.ManagementTimeout,
httpClient: provider,
bucketName: b.Name(),
globalTimeout: b.sb.ManagementTimeout,
defaultRetryStrategy: b.sb.RetryStrategyWrapper,
}, nil
}
171 changes: 116 additions & 55 deletions bucket_collectionsmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
// CollectionManager provides methods for performing collections management.
// Volatile: This API is subject to change at any time.
type CollectionManager struct {
httpClient httpProvider
bucketName string
globalTimeout time.Duration
httpClient httpProvider
bucketName string
globalTimeout time.Duration
defaultRetryStrategy *retryStrategyWrapper
}

// CollectionSpec describes the specification of a collection.
Expand All @@ -33,8 +34,9 @@ type ScopeSpec struct {

// CollectionExistsOptions is the set of options available to the CollectionExists operation.
type CollectionExistsOptions struct {
Timeout time.Duration
Context context.Context
Timeout time.Duration
Context context.Context
RetryStrategy RetryStrategy
}

// These 3 types are temporary. They are necessary for now as the server beta was released with ns_server returning
Expand Down Expand Up @@ -76,14 +78,21 @@ func (cm *CollectionManager) CollectionExists(spec CollectionSpec, opts *Collect
defer cancel()
}

retryStrategy := cm.defaultRetryStrategy
if opts.RetryStrategy == nil {
retryStrategy = newRetryStrategyWrapper(opts.RetryStrategy)
}

posts := url.Values{}
posts.Add("name", spec.Name)

req := &gocbcore.HttpRequest{
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections", cm.bucketName),
Method: "GET",
Context: ctx,
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections", cm.bucketName),
Method: "GET",
Context: ctx,
RetryStrategy: retryStrategy,
IsIdempotent: true,
}

resp, err := cm.httpClient.DoHttpRequest(req)
Expand Down Expand Up @@ -152,8 +161,9 @@ func (cm *CollectionManager) CollectionExists(spec CollectionSpec, opts *Collect

// ScopeExistsOptions is the set of options available to the ScopeExists operation.
type ScopeExistsOptions struct {
Timeout time.Duration
Context context.Context
Timeout time.Duration
Context context.Context
RetryStrategy RetryStrategy
}

// ScopeExists verifies whether or not a scope exists on the bucket.
Expand All @@ -173,11 +183,18 @@ func (cm *CollectionManager) ScopeExists(scopeName string, opts *ScopeExistsOpti
defer cancel()
}

retryStrategy := cm.defaultRetryStrategy
if opts.RetryStrategy == nil {
retryStrategy = newRetryStrategyWrapper(opts.RetryStrategy)
}

req := &gocbcore.HttpRequest{
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections", cm.bucketName),
Method: "GET",
Context: ctx,
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections", cm.bucketName),
Method: "GET",
Context: ctx,
RetryStrategy: retryStrategy,
IsIdempotent: true,
}

resp, err := cm.httpClient.DoHttpRequest(req)
Expand Down Expand Up @@ -235,8 +252,9 @@ func (cm *CollectionManager) ScopeExists(scopeName string, opts *ScopeExistsOpti

// GetScopeOptions is the set of options available to the GetScope operation.
type GetScopeOptions struct {
Timeout time.Duration
Context context.Context
Timeout time.Duration
Context context.Context
RetryStrategy RetryStrategy
}

// GetScope gets a scope from the bucket.
Expand All @@ -256,11 +274,18 @@ func (cm *CollectionManager) GetScope(scopeName string, opts *GetScopeOptions) (
defer cancel()
}

retryStrategy := cm.defaultRetryStrategy
if opts.RetryStrategy == nil {
retryStrategy = newRetryStrategyWrapper(opts.RetryStrategy)
}

req := &gocbcore.HttpRequest{
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections", cm.bucketName),
Method: "GET",
Context: ctx,
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections", cm.bucketName),
Method: "GET",
Context: ctx,
RetryStrategy: retryStrategy,
IsIdempotent: true,
}

resp, err := cm.httpClient.DoHttpRequest(req)
Expand Down Expand Up @@ -346,8 +371,9 @@ func (cm *CollectionManager) GetScope(scopeName string, opts *GetScopeOptions) (

// GetAllScopesOptions is the set of options available to the GetAllScopes operation.
type GetAllScopesOptions struct {
Timeout time.Duration
Context context.Context
Timeout time.Duration
Context context.Context
RetryStrategy RetryStrategy
}

// GetAllScopes gets all scopes from the bucket.
Expand All @@ -361,11 +387,18 @@ func (cm *CollectionManager) GetAllScopes(opts *GetAllScopesOptions) ([]ScopeSpe
defer cancel()
}

retryStrategy := cm.defaultRetryStrategy
if opts.RetryStrategy == nil {
retryStrategy = newRetryStrategyWrapper(opts.RetryStrategy)
}

req := &gocbcore.HttpRequest{
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections", cm.bucketName),
Method: "GET",
Context: ctx,
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections", cm.bucketName),
Method: "GET",
Context: ctx,
RetryStrategy: retryStrategy,
IsIdempotent: true,
}

resp, err := cm.httpClient.DoHttpRequest(req)
Expand Down Expand Up @@ -440,8 +473,9 @@ func (cm *CollectionManager) GetAllScopes(opts *GetAllScopesOptions) ([]ScopeSpe

// CreateCollectionOptions is the set of options available to the CreateCollection operation.
type CreateCollectionOptions struct {
Timeout time.Duration
Context context.Context
Timeout time.Duration
Context context.Context
RetryStrategy RetryStrategy
}

// CreateCollection creates a new collection on the bucket.
Expand All @@ -467,16 +501,22 @@ func (cm *CollectionManager) CreateCollection(spec CollectionSpec, opts *CreateC
defer cancel()
}

retryStrategy := cm.defaultRetryStrategy
if opts.RetryStrategy == nil {
retryStrategy = newRetryStrategyWrapper(opts.RetryStrategy)
}

posts := url.Values{}
posts.Add("name", spec.Name)

req := &gocbcore.HttpRequest{
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections/%s", cm.bucketName, spec.ScopeName),
Method: "POST",
Body: []byte(posts.Encode()),
ContentType: "application/x-www-form-urlencoded",
Context: ctx,
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections/%s", cm.bucketName, spec.ScopeName),
Method: "POST",
Body: []byte(posts.Encode()),
ContentType: "application/x-www-form-urlencoded",
Context: ctx,
RetryStrategy: retryStrategy,
}

resp, err := cm.httpClient.DoHttpRequest(req)
Expand Down Expand Up @@ -510,8 +550,9 @@ func (cm *CollectionManager) CreateCollection(spec CollectionSpec, opts *CreateC

// DropCollectionOptions is the set of options available to the DropCollection operation.
type DropCollectionOptions struct {
Timeout time.Duration
Context context.Context
Timeout time.Duration
Context context.Context
RetryStrategy RetryStrategy
}

// DropCollection removes a collection.
Expand All @@ -537,11 +578,17 @@ func (cm *CollectionManager) DropCollection(spec CollectionSpec, opts *DropColle
defer cancel()
}

retryStrategy := cm.defaultRetryStrategy
if opts.RetryStrategy == nil {
retryStrategy = newRetryStrategyWrapper(opts.RetryStrategy)
}

req := &gocbcore.HttpRequest{
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections/%s/%s", cm.bucketName, spec.ScopeName, spec.Name),
Method: "DELETE",
Context: ctx,
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections/%s/%s", cm.bucketName, spec.ScopeName, spec.Name),
Method: "DELETE",
Context: ctx,
RetryStrategy: retryStrategy,
}

resp, err := cm.httpClient.DoHttpRequest(req)
Expand Down Expand Up @@ -575,8 +622,9 @@ func (cm *CollectionManager) DropCollection(spec CollectionSpec, opts *DropColle

// CreateScopeOptions is the set of options available to the CreateScope operation.
type CreateScopeOptions struct {
Timeout time.Duration
Context context.Context
Timeout time.Duration
Context context.Context
RetryStrategy RetryStrategy
}

// CreateScope creates a new scope on the bucket.
Expand All @@ -596,16 +644,22 @@ func (cm *CollectionManager) CreateScope(scopeName string, opts *CreateScopeOpti
defer cancel()
}

retryStrategy := cm.defaultRetryStrategy
if opts.RetryStrategy == nil {
retryStrategy = newRetryStrategyWrapper(opts.RetryStrategy)
}

posts := url.Values{}
posts.Add("name", scopeName)

req := &gocbcore.HttpRequest{
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections", cm.bucketName),
Method: "POST",
Body: []byte(posts.Encode()),
ContentType: "application/x-www-form-urlencoded",
Context: ctx,
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections", cm.bucketName),
Method: "POST",
Body: []byte(posts.Encode()),
ContentType: "application/x-www-form-urlencoded",
Context: ctx,
RetryStrategy: retryStrategy,
}

resp, err := cm.httpClient.DoHttpRequest(req)
Expand Down Expand Up @@ -639,8 +693,9 @@ func (cm *CollectionManager) CreateScope(scopeName string, opts *CreateScopeOpti

// DropScopeOptions is the set of options available to the DropScope operation.
type DropScopeOptions struct {
Timeout time.Duration
Context context.Context
Timeout time.Duration
Context context.Context
RetryStrategy RetryStrategy
}

// DropScope removes a scope.
Expand All @@ -654,11 +709,17 @@ func (cm *CollectionManager) DropScope(scopeName string, opts *DropScopeOptions)
defer cancel()
}

retryStrategy := cm.defaultRetryStrategy
if opts.RetryStrategy == nil {
retryStrategy = newRetryStrategyWrapper(opts.RetryStrategy)
}

req := &gocbcore.HttpRequest{
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections/%s", cm.bucketName, scopeName),
Method: "DELETE",
Context: ctx,
Service: gocbcore.ServiceType(MgmtService),
Path: fmt.Sprintf("/pools/default/buckets/%s/collections/%s", cm.bucketName, scopeName),
Method: "DELETE",
Context: ctx,
RetryStrategy: retryStrategy,
}

resp, err := cm.httpClient.DoHttpRequest(req)
Expand Down
Loading

0 comments on commit db833d3

Please sign in to comment.