diff --git a/bucket.go b/bucket.go index 582b8b3..6720049 100644 --- a/bucket.go +++ b/bucket.go @@ -148,10 +148,9 @@ func (b *Bucket) ViewIndexes() *ViewIndexManager { func (b *Bucket) Collections() *CollectionManager { // TODO: return error for unsupported collections return &CollectionManager{ - mgmtProvider: b, - bucketName: b.Name(), - tracer: b.tracer, - meter: b.meter, + getProvider: func() (collectionsManagementProvider, error) { + return b.connectionManager.getCollectionsManagementProvider(b.Name()) + }, } } diff --git a/bucket_collectionsmgr.go b/bucket_collectionsmgr.go index da33e74..349f2bb 100644 --- a/bucket_collectionsmgr.go +++ b/bucket_collectionsmgr.go @@ -2,17 +2,7 @@ package gocb import ( "context" - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "net/url" - "strings" "time" - - "github.com/google/uuid" - - "github.com/couchbase/gocbcore/v10" ) // CollectionSpec describes the specification of a collection. @@ -28,56 +18,9 @@ type ScopeSpec struct { Collections []CollectionSpec } -// These 3 types are temporary. They are necessary for now as the server beta was released with ns_server returning -// a different jsonManifest format to what it will return in the future. -type jsonManifest struct { - UID uint64 `json:"uid"` - Scopes map[string]jsonManifestScope `json:"scopes"` -} - -type jsonManifestScope struct { - UID uint32 `json:"uid"` - Collections map[string]jsonManifestCollection `json:"collections"` -} - -type jsonManifestCollection struct { - UID uint32 `json:"uid"` -} - // CollectionManager provides methods for performing collections management. type CollectionManager struct { - mgmtProvider mgmtProvider - bucketName string - tracer RequestTracer - meter *meterWrapper -} - -func (cm *CollectionManager) tryParseErrorMessage(req *mgmtRequest, resp *mgmtResponse) error { - b, err := ioutil.ReadAll(resp.Body) - if err != nil { - logDebugf("failed to read http body: %s", err) - return nil - } - - errText := strings.ToLower(string(b)) - - if err := checkForRateLimitError(resp.StatusCode, errText); err != nil { - return makeGenericMgmtError(err, req, resp, string(b)) - } - - if strings.Contains(errText, "not found") && strings.Contains(errText, "collection") { - return makeGenericMgmtError(ErrCollectionNotFound, req, resp, string(b)) - } else if strings.Contains(errText, "not found") && strings.Contains(errText, "scope") { - return makeGenericMgmtError(ErrScopeNotFound, req, resp, string(b)) - } - - if strings.Contains(errText, "already exists") && strings.Contains(errText, "collection") { - return makeGenericMgmtError(ErrCollectionExists, req, resp, string(b)) - } else if strings.Contains(errText, "already exists") && strings.Contains(errText, "scope") { - return makeGenericMgmtError(ErrScopeExists, req, resp, string(b)) - } - - return makeGenericMgmtError(errors.New(errText), req, resp, string(b)) + getProvider func() (collectionsManagementProvider, error) } // GetAllScopesOptions is the set of options available to the GetAllScopes operation. @@ -98,84 +41,12 @@ func (cm *CollectionManager) GetAllScopes(opts *GetAllScopesOptions) ([]ScopeSpe opts = &GetAllScopesOptions{} } - start := time.Now() - defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_get_all_scopes", start) - - path := fmt.Sprintf("/pools/default/buckets/%s/scopes", cm.bucketName) - span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_get_all_scopes", "management") - span.SetAttribute("db.name", cm.bucketName) - span.SetAttribute("db.operation", "GET "+path) - defer span.End() - - req := mgmtRequest{ - Service: ServiceTypeManagement, - Path: path, - Method: "GET", - RetryStrategy: opts.RetryStrategy, - IsIdempotent: true, - UniqueID: uuid.New().String(), - Timeout: opts.Timeout, - parentSpanCtx: span.Context(), - } - - resp, err := cm.mgmtProvider.executeMgmtRequest(opts.Context, req) + provider, err := cm.getProvider() if err != nil { - return nil, makeMgmtBadStatusError("failed to get all scopes", &req, resp) - } - defer ensureBodyClosed(resp.Body) - - if resp.StatusCode != 200 { - colErr := cm.tryParseErrorMessage(&req, resp) - if colErr != nil { - return nil, colErr - } - return nil, makeMgmtBadStatusError("failed to get all scopes", &req, resp) - } - - var scopes []ScopeSpec - var mfest gocbcore.Manifest - jsonDec := json.NewDecoder(resp.Body) - err = jsonDec.Decode(&mfest) - if err == nil { - for _, scope := range mfest.Scopes { - var collections []CollectionSpec - for _, col := range scope.Collections { - collections = append(collections, CollectionSpec{ - Name: col.Name, - ScopeName: scope.Name, - MaxExpiry: time.Duration(col.MaxTTL) * time.Second, - }) - } - scopes = append(scopes, ScopeSpec{ - Name: scope.Name, - Collections: collections, - }) - } - } else { - // Temporary support for older server version - var oldMfest jsonManifest - jsonDec := json.NewDecoder(resp.Body) - err = jsonDec.Decode(&oldMfest) - if err != nil { - return nil, err - } - - for scopeName, scope := range oldMfest.Scopes { - var collections []CollectionSpec - for colName := range scope.Collections { - collections = append(collections, CollectionSpec{ - Name: colName, - ScopeName: scopeName, - }) - } - scopes = append(scopes, ScopeSpec{ - Name: scopeName, - Collections: collections, - }) - } + return nil, err } - return scopes, nil + return provider.GetAllScopes(opts) } // CreateCollectionOptions is the set of options available to the CreateCollection operation. @@ -204,60 +75,12 @@ func (cm *CollectionManager) CreateCollection(spec CollectionSpec, opts *CreateC opts = &CreateCollectionOptions{} } - start := time.Now() - defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_create_collection", start) - - path := fmt.Sprintf("/pools/default/buckets/%s/scopes/%s/collections", cm.bucketName, spec.ScopeName) - span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_create_collection", "management") - span.SetAttribute("db.name", cm.bucketName) - span.SetAttribute("db.couchbase.scope", spec.ScopeName) - span.SetAttribute("db.couchbase.collection", spec.Name) - span.SetAttribute("db.operation", "POST "+path) - defer span.End() - - posts := url.Values{} - posts.Add("name", spec.Name) - - if spec.MaxExpiry > 0 { - posts.Add("maxTTL", fmt.Sprintf("%d", int(spec.MaxExpiry.Seconds()))) - } - - eSpan := createSpan(cm.tracer, span, "request_encoding", "") - encoded := posts.Encode() - eSpan.End() - - req := mgmtRequest{ - Service: ServiceTypeManagement, - Path: path, - Method: "POST", - Body: []byte(encoded), - ContentType: "application/x-www-form-urlencoded", - RetryStrategy: opts.RetryStrategy, - UniqueID: uuid.New().String(), - Timeout: opts.Timeout, - parentSpanCtx: span.Context(), - } - - resp, err := cm.mgmtProvider.executeMgmtRequest(opts.Context, req) - if err != nil { - return makeGenericMgmtError(err, &req, resp, "") - } - defer ensureBodyClosed(resp.Body) - - if resp.StatusCode != 200 { - colErr := cm.tryParseErrorMessage(&req, resp) - if colErr != nil { - return colErr - } - return makeMgmtBadStatusError("failed to create collection", &req, resp) - } - - err = resp.Body.Close() + provider, err := cm.getProvider() if err != nil { - logDebugf("Failed to close socket (%s)", err) + return err } - return nil + return provider.CreateCollection(spec, opts) } // DropCollectionOptions is the set of options available to the DropCollection operation. @@ -286,47 +109,12 @@ func (cm *CollectionManager) DropCollection(spec CollectionSpec, opts *DropColle opts = &DropCollectionOptions{} } - start := time.Now() - defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_drop_collection", start) - - path := fmt.Sprintf("/pools/default/buckets/%s/scopes/%s/collections/%s", cm.bucketName, spec.ScopeName, spec.Name) - span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_drop_collection", "management") - span.SetAttribute("db.name", cm.bucketName) - span.SetAttribute("db.couchbase.scope", spec.ScopeName) - span.SetAttribute("db.couchbase.collection", spec.Name) - span.SetAttribute("db.operation", "DELETE "+path) - defer span.End() - - req := mgmtRequest{ - Service: ServiceTypeManagement, - Path: path, - Method: "DELETE", - RetryStrategy: opts.RetryStrategy, - UniqueID: uuid.New().String(), - Timeout: opts.Timeout, - parentSpanCtx: span.Context(), - } - - resp, err := cm.mgmtProvider.executeMgmtRequest(opts.Context, req) + provider, err := cm.getProvider() if err != nil { - return makeGenericMgmtError(err, &req, resp, "") - } - defer ensureBodyClosed(resp.Body) - - if resp.StatusCode != 200 { - colErr := cm.tryParseErrorMessage(&req, resp) - if colErr != nil { - return colErr - } - return makeMgmtBadStatusError("failed to drop collection", &req, resp) + return err } - err = resp.Body.Close() - if err != nil { - logDebugf("Failed to close socket (%s)", err) - } - - return nil + return provider.DropCollection(spec, opts) } // CreateScopeOptions is the set of options available to the CreateScope operation. @@ -351,55 +139,12 @@ func (cm *CollectionManager) CreateScope(scopeName string, opts *CreateScopeOpti opts = &CreateScopeOptions{} } - start := time.Now() - defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_create_scope", start) - - path := fmt.Sprintf("/pools/default/buckets/%s/scopes", cm.bucketName) - span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_create_scope", "management") - span.SetAttribute("db.name", cm.bucketName) - span.SetAttribute("db.couchbase.scope", scopeName) - span.SetAttribute("db.operation", "POST "+path) - defer span.End() - - posts := url.Values{} - posts.Add("name", scopeName) - - eSpan := createSpan(cm.tracer, span, "request_encoding", "") - encoded := posts.Encode() - eSpan.End() - - req := mgmtRequest{ - Service: ServiceTypeManagement, - Path: path, - Method: "POST", - Body: []byte(encoded), - ContentType: "application/x-www-form-urlencoded", - RetryStrategy: opts.RetryStrategy, - UniqueID: uuid.New().String(), - Timeout: opts.Timeout, - parentSpanCtx: span.Context(), - } - - resp, err := cm.mgmtProvider.executeMgmtRequest(opts.Context, req) + provider, err := cm.getProvider() if err != nil { - return makeGenericMgmtError(err, &req, resp, "") - } - defer ensureBodyClosed(resp.Body) - - if resp.StatusCode != 200 { - colErr := cm.tryParseErrorMessage(&req, resp) - if colErr != nil { - return colErr - } - return makeMgmtBadStatusError("failed to create scope", &req, resp) - } - - err = resp.Body.Close() - if err != nil { - logDebugf("Failed to close socket (%s)", err) + return err } - return nil + return provider.CreateScope(scopeName, opts) } // DropScopeOptions is the set of options available to the DropScope operation. @@ -416,48 +161,18 @@ type DropScopeOptions struct { // DropScope removes a scope. func (cm *CollectionManager) DropScope(scopeName string, opts *DropScopeOptions) error { - if opts == nil { - opts = &DropScopeOptions{} - } - - start := time.Now() - defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_drop_scope", start) - - path := fmt.Sprintf("/pools/default/buckets/%s/scopes/%s", cm.bucketName, scopeName) - span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_drop_scope", "management") - span.SetAttribute("db.name", cm.bucketName) - span.SetAttribute("db.couchbase.scope", scopeName) - span.SetAttribute("db.operation", "DELETE "+path) - defer span.End() - - req := mgmtRequest{ - Service: ServiceTypeManagement, - Path: path, - Method: "DELETE", - RetryStrategy: opts.RetryStrategy, - UniqueID: uuid.New().String(), - Timeout: opts.Timeout, - parentSpanCtx: span.Context(), + if scopeName == "" { + return makeInvalidArgumentsError("scope name cannot be empty") } - resp, err := cm.mgmtProvider.executeMgmtRequest(opts.Context, req) - if err != nil { - return makeGenericMgmtError(err, &req, resp, "") - } - defer ensureBodyClosed(resp.Body) - - if resp.StatusCode != 200 { - colErr := cm.tryParseErrorMessage(&req, resp) - if colErr != nil { - return colErr - } - return makeMgmtBadStatusError("failed to drop scope", &req, resp) + if opts == nil { + opts = &DropScopeOptions{} } - err = resp.Body.Close() + provider, err := cm.getProvider() if err != nil { - logDebugf("Failed to close socket (%s)", err) + return err } - return nil + return provider.DropScope(scopeName, opts) } diff --git a/bucket_collectionsmgr_test.go b/bucket_collectionsmgr_test.go index 98c38b2..f06371c 100644 --- a/bucket_collectionsmgr_test.go +++ b/bucket_collectionsmgr_test.go @@ -100,58 +100,90 @@ func (suite *IntegrationTestSuite) TestCollectionManagerCrud() { suite.Require().Contains(globalTracer.GetSpans(), nil) nilParents := globalTracer.GetSpans()[nil] suite.Require().Equal(9, len(nilParents)) + + isProtostellar := globalCluster.IsProtostellar() + var operationID string + var numDispatchSpans int + if isProtostellar { + operationID = "CreateScope" + numDispatchSpans = 0 + } else { + operationID = "POST " + fmt.Sprintf("/pools/default/buckets/%s/scopes", globalConfig.Bucket) + numDispatchSpans = 1 + } + suite.AssertHTTPOpSpan(nilParents[0], "manager_collections_create_scope", HTTPOpSpanExpectations{ bucket: globalConfig.Bucket, scope: scopeName, service: "management", - operationID: "POST " + fmt.Sprintf("/pools/default/buckets/%s/scopes", globalConfig.Bucket), - numDispatchSpans: 1, + operationID: operationID, + numDispatchSpans: numDispatchSpans, atLeastNumDispatchSpans: false, - hasEncoding: true, + hasEncoding: !isProtostellar, dispatchOperationID: "any", }) + if isProtostellar { + operationID = "CreateCollection" + } else { + operationID = "POST " + fmt.Sprintf("/pools/default/buckets/%s/scopes/%s/collections", globalConfig.Bucket, scopeName) + } suite.AssertHTTPOpSpan(nilParents[2], "manager_collections_create_collection", HTTPOpSpanExpectations{ bucket: globalConfig.Bucket, scope: scopeName, collection: collectionName, service: "management", - operationID: "POST " + fmt.Sprintf("/pools/default/buckets/%s/scopes/%s/collections", globalConfig.Bucket, scopeName), - numDispatchSpans: 1, + operationID: operationID, + numDispatchSpans: numDispatchSpans, atLeastNumDispatchSpans: false, - hasEncoding: true, + hasEncoding: !isProtostellar, dispatchOperationID: "any", }) + if isProtostellar { + operationID = "ListCollections" + } else { + operationID = "GET " + fmt.Sprintf("/pools/default/buckets/%s/scopes", globalConfig.Bucket) + } suite.AssertHTTPOpSpan(nilParents[4], "manager_collections_get_all_scopes", HTTPOpSpanExpectations{ bucket: globalConfig.Bucket, service: "management", - operationID: "GET " + fmt.Sprintf("/pools/default/buckets/%s/scopes", globalConfig.Bucket), - numDispatchSpans: 1, + operationID: operationID, + numDispatchSpans: numDispatchSpans, atLeastNumDispatchSpans: false, hasEncoding: false, dispatchOperationID: "any", }) + if isProtostellar { + operationID = "DeleteCollection" + } else { + operationID = "DELETE " + fmt.Sprintf("/pools/default/buckets/%s/scopes/%s/collections/%s", globalConfig.Bucket, scopeName, collectionName) + } suite.AssertHTTPOpSpan(nilParents[5], "manager_collections_drop_collection", HTTPOpSpanExpectations{ bucket: globalConfig.Bucket, scope: scopeName, collection: collectionName, service: "management", - operationID: "DELETE " + fmt.Sprintf("/pools/default/buckets/%s/scopes/%s/collections/%s", globalConfig.Bucket, scopeName, collectionName), - numDispatchSpans: 1, + operationID: operationID, + numDispatchSpans: numDispatchSpans, atLeastNumDispatchSpans: false, hasEncoding: false, dispatchOperationID: "any", }) + if isProtostellar { + operationID = "DeleteScope" + } else { + operationID = "DELETE " + fmt.Sprintf("/pools/default/buckets/%s/scopes/%s", globalConfig.Bucket, scopeName) + } suite.AssertHTTPOpSpan(nilParents[7], "manager_collections_drop_scope", HTTPOpSpanExpectations{ bucket: globalConfig.Bucket, scope: scopeName, service: "management", - operationID: "DELETE " + fmt.Sprintf("/pools/default/buckets/%s/scopes/%s", globalConfig.Bucket, scopeName), - numDispatchSpans: 1, + operationID: operationID, + numDispatchSpans: numDispatchSpans, atLeastNumDispatchSpans: false, hasEncoding: false, dispatchOperationID: "any", @@ -549,7 +581,6 @@ func (suite *IntegrationTestSuite) TestNumberOfCollectionInScope() { } func (suite *IntegrationTestSuite) TestMaxNumberOfCollectionInScope() { - suite.T().Skip() suite.skipIfUnsupported(CollectionsFeature) suite.skipIfUnsupported(CollectionsManagerFeature) suite.skipIfUnsupported(CollectionsManagerMaxCollectionsFeature) @@ -602,9 +633,13 @@ func (suite *UnitTestSuite) TestGetAllScopesMgmtRequestFails() { provider.On("executeMgmtRequest", nil, mock.AnythingOfType("gocb.mgmtRequest")).Return(nil, errors.New("http send failure")) mgr := CollectionManager{ - mgmtProvider: provider, - tracer: &NoopTracer{}, - meter: &meterWrapper{meter: &NoopMeter{}, isNoopMeter: true}, + getProvider: func() (collectionsManagementProvider, error) { + return &collectionsManagementProviderCore{ + mgmtProvider: provider, + tracer: &NoopTracer{}, + meter: &meterWrapper{meter: &NoopMeter{}, isNoopMeter: true}, + }, nil + }, } scopes, err := mgr.GetAllScopes(nil) diff --git a/client.go b/client.go index bf272d9..81df561 100644 --- a/client.go +++ b/client.go @@ -6,6 +6,9 @@ type connectionManager interface { connect() error openBucket(bucketName string) error buildConfig(cluster *Cluster) error + connection(bucketName string) (*gocbcore.Agent, error) + close() error + getKvProvider(bucketName string) (kvProvider, error) getKvCapabilitiesProvider(bucketName string) (kvCapabilityVerifier, error) getViewProvider(bucketName string) (viewProvider, error) @@ -16,8 +19,7 @@ type connectionManager interface { getHTTPProvider(bucketName string) (httpProvider, error) getDiagnosticsProvider(bucketName string) (diagnosticsProvider, error) getWaitUntilReadyProvider(bucketName string) (waitUntilReadyProvider, error) - connection(bucketName string) (*gocbcore.Agent, error) - close() error + getCollectionsManagementProvider(bucketName string) (collectionsManagementProvider, error) } func (c *Cluster) newConnectionMgr(protocol string) connectionManager { diff --git a/client_core.go b/client_core.go index 8369938..fdf84c1 100644 --- a/client_core.go +++ b/client_core.go @@ -280,6 +280,24 @@ func (c *stdConnectionMgr) getWaitUntilReadyProvider(bucketName string) (waitUnt }, nil } +func (c *stdConnectionMgr) getCollectionsManagementProvider(bucketName string) (collectionsManagementProvider, error) { + provider, err := c.getHTTPProvider(bucketName) + if err != nil { + return nil, err + } + + return &collectionsManagementProviderCore{ + mgmtProvider: &mgmtProviderCore{ + provider: provider, + mgmtTimeout: c.timeouts.ManagementTimeout, + retryStrategyWrapper: c.retryStrategyWrapper, + }, + bucketName: bucketName, + tracer: c.tracer, + meter: c.meter, + }, nil +} + func (c *stdConnectionMgr) connection(bucketName string) (*gocbcore.Agent, error) { if c.agentgroup == nil { return nil, errors.New("cluster not yet connected") diff --git a/client_ps.go b/client_ps.go index 3143217..d31d073 100644 --- a/client_ps.go +++ b/client_ps.go @@ -85,6 +85,16 @@ func (c *psConnectionMgr) getQueryIndexProvider() (queryIndexProvider, error) { }, nil } +func (c *psConnectionMgr) getCollectionsManagementProvider(bucketName string) (collectionsManagementProvider, error) { + return &collectionsManagementProviderPs{ + provider: c.agent.CollectionV1(), + bucketName: bucketName, + defaultTimeout: c.timeouts.ManagementTimeout, + tracer: c.tracer, + meter: c.meter, + }, nil +} + func (c *psConnectionMgr) getAnalyticsProvider() (analyticsProvider, error) { return &analyticsProviderWrapper{}, ErrFeatureNotAvailable } diff --git a/collectionsmgmtprovider.go b/collectionsmgmtprovider.go new file mode 100644 index 0000000..a762cb1 --- /dev/null +++ b/collectionsmgmtprovider.go @@ -0,0 +1,9 @@ +package gocb + +type collectionsManagementProvider interface { + GetAllScopes(opts *GetAllScopesOptions) ([]ScopeSpec, error) + CreateCollection(spec CollectionSpec, opts *CreateCollectionOptions) error + DropCollection(spec CollectionSpec, opts *DropCollectionOptions) error + CreateScope(scopeName string, opts *CreateScopeOptions) error + DropScope(scopeName string, opts *DropScopeOptions) error +} diff --git a/collectionsmgmtprovider_core.go b/collectionsmgmtprovider_core.go new file mode 100644 index 0000000..8606fd6 --- /dev/null +++ b/collectionsmgmtprovider_core.go @@ -0,0 +1,382 @@ +package gocb + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/url" + "strings" + "time" + + gocbcore "github.com/couchbase/gocbcore/v10" + "github.com/google/uuid" +) + +type collectionsManagementProviderCore struct { + mgmtProvider mgmtProvider + bucketName string + tracer RequestTracer + meter *meterWrapper +} + +func (cm *collectionsManagementProviderCore) GetAllScopes(opts *GetAllScopesOptions) ([]ScopeSpec, error) { + start := time.Now() + defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_get_all_scopes", start) + + path := fmt.Sprintf("/pools/default/buckets/%s/scopes", cm.bucketName) + span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_get_all_scopes", "management") + span.SetAttribute("db.name", cm.bucketName) + span.SetAttribute("db.operation", "GET "+path) + defer span.End() + + req := mgmtRequest{ + Service: ServiceTypeManagement, + Path: path, + Method: "GET", + RetryStrategy: opts.RetryStrategy, + IsIdempotent: true, + UniqueID: uuid.New().String(), + Timeout: opts.Timeout, + parentSpanCtx: span.Context(), + } + + resp, err := cm.mgmtProvider.executeMgmtRequest(opts.Context, req) + if err != nil { + return nil, makeMgmtBadStatusError("failed to get all scopes", &req, resp) + } + defer ensureBodyClosed(resp.Body) + + if resp.StatusCode != 200 { + colErr := cm.tryParseErrorMessage(&req, resp) + if colErr != nil { + return nil, colErr + } + return nil, makeMgmtBadStatusError("failed to get all scopes", &req, resp) + } + + var scopes []ScopeSpec + var mfest gocbcore.Manifest + jsonDec := json.NewDecoder(resp.Body) + err = jsonDec.Decode(&mfest) + if err == nil { + for _, scope := range mfest.Scopes { + var collections []CollectionSpec + for _, col := range scope.Collections { + collections = append(collections, CollectionSpec{ + Name: col.Name, + ScopeName: scope.Name, + MaxExpiry: time.Duration(col.MaxTTL) * time.Second, + }) + } + scopes = append(scopes, ScopeSpec{ + Name: scope.Name, + Collections: collections, + }) + } + } else { + // Temporary support for older server version + var oldMfest jsonManifest + jsonDec := json.NewDecoder(resp.Body) + err = jsonDec.Decode(&oldMfest) + if err != nil { + return nil, err + } + + for scopeName, scope := range oldMfest.Scopes { + var collections []CollectionSpec + for colName := range scope.Collections { + collections = append(collections, CollectionSpec{ + Name: colName, + ScopeName: scopeName, + }) + } + scopes = append(scopes, ScopeSpec{ + Name: scopeName, + Collections: collections, + }) + } + } + + return scopes, nil +} + +// CreateCollection creates a new collection on the bucket. +func (cm *collectionsManagementProviderCore) CreateCollection(spec CollectionSpec, opts *CreateCollectionOptions) error { + if spec.Name == "" { + return makeInvalidArgumentsError("collection name cannot be empty") + } + + if spec.ScopeName == "" { + return makeInvalidArgumentsError("scope name cannot be empty") + } + + if opts == nil { + opts = &CreateCollectionOptions{} + } + + start := time.Now() + defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_create_collection", start) + + path := fmt.Sprintf("/pools/default/buckets/%s/scopes/%s/collections", cm.bucketName, spec.ScopeName) + span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_create_collection", "management") + span.SetAttribute("db.name", cm.bucketName) + span.SetAttribute("db.couchbase.scope", spec.ScopeName) + span.SetAttribute("db.couchbase.collection", spec.Name) + span.SetAttribute("db.operation", "POST "+path) + defer span.End() + + posts := url.Values{} + posts.Add("name", spec.Name) + + if spec.MaxExpiry > 0 { + posts.Add("maxTTL", fmt.Sprintf("%d", int(spec.MaxExpiry.Seconds()))) + } + + eSpan := createSpan(cm.tracer, span, "request_encoding", "") + encoded := posts.Encode() + eSpan.End() + + req := mgmtRequest{ + Service: ServiceTypeManagement, + Path: path, + Method: "POST", + Body: []byte(encoded), + ContentType: "application/x-www-form-urlencoded", + RetryStrategy: opts.RetryStrategy, + UniqueID: uuid.New().String(), + Timeout: opts.Timeout, + parentSpanCtx: span.Context(), + } + + resp, err := cm.mgmtProvider.executeMgmtRequest(opts.Context, req) + if err != nil { + return makeGenericMgmtError(err, &req, resp, "") + } + defer ensureBodyClosed(resp.Body) + + if resp.StatusCode != 200 { + colErr := cm.tryParseErrorMessage(&req, resp) + if colErr != nil { + return colErr + } + return makeMgmtBadStatusError("failed to create collection", &req, resp) + } + + err = resp.Body.Close() + if err != nil { + logDebugf("Failed to close socket (%s)", err) + } + + return nil +} + +// DropCollection removes a collection. +func (cm *collectionsManagementProviderCore) DropCollection(spec CollectionSpec, opts *DropCollectionOptions) error { + if spec.Name == "" { + return makeInvalidArgumentsError("collection name cannot be empty") + } + + if spec.ScopeName == "" { + return makeInvalidArgumentsError("scope name cannot be empty") + } + + if opts == nil { + opts = &DropCollectionOptions{} + } + + start := time.Now() + defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_drop_collection", start) + + path := fmt.Sprintf("/pools/default/buckets/%s/scopes/%s/collections/%s", cm.bucketName, spec.ScopeName, spec.Name) + span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_drop_collection", "management") + span.SetAttribute("db.name", cm.bucketName) + span.SetAttribute("db.couchbase.scope", spec.ScopeName) + span.SetAttribute("db.couchbase.collection", spec.Name) + span.SetAttribute("db.operation", "DELETE "+path) + defer span.End() + + req := mgmtRequest{ + Service: ServiceTypeManagement, + Path: path, + Method: "DELETE", + RetryStrategy: opts.RetryStrategy, + UniqueID: uuid.New().String(), + Timeout: opts.Timeout, + parentSpanCtx: span.Context(), + } + + resp, err := cm.mgmtProvider.executeMgmtRequest(opts.Context, req) + if err != nil { + return makeGenericMgmtError(err, &req, resp, "") + } + defer ensureBodyClosed(resp.Body) + + if resp.StatusCode != 200 { + colErr := cm.tryParseErrorMessage(&req, resp) + if colErr != nil { + return colErr + } + return makeMgmtBadStatusError("failed to drop collection", &req, resp) + } + + err = resp.Body.Close() + if err != nil { + logDebugf("Failed to close socket (%s)", err) + } + + return nil +} + +// CreateScope creates a new scope on the bucket. +func (cm *collectionsManagementProviderCore) CreateScope(scopeName string, opts *CreateScopeOptions) error { + if scopeName == "" { + return makeInvalidArgumentsError("scope name cannot be empty") + } + + if opts == nil { + opts = &CreateScopeOptions{} + } + + start := time.Now() + defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_create_scope", start) + + path := fmt.Sprintf("/pools/default/buckets/%s/scopes", cm.bucketName) + span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_create_scope", "management") + span.SetAttribute("db.name", cm.bucketName) + span.SetAttribute("db.couchbase.scope", scopeName) + span.SetAttribute("db.operation", "POST "+path) + defer span.End() + + posts := url.Values{} + posts.Add("name", scopeName) + + eSpan := createSpan(cm.tracer, span, "request_encoding", "") + encoded := posts.Encode() + eSpan.End() + + req := mgmtRequest{ + Service: ServiceTypeManagement, + Path: path, + Method: "POST", + Body: []byte(encoded), + ContentType: "application/x-www-form-urlencoded", + RetryStrategy: opts.RetryStrategy, + UniqueID: uuid.New().String(), + Timeout: opts.Timeout, + parentSpanCtx: span.Context(), + } + + resp, err := cm.mgmtProvider.executeMgmtRequest(opts.Context, req) + if err != nil { + return makeGenericMgmtError(err, &req, resp, "") + } + defer ensureBodyClosed(resp.Body) + + if resp.StatusCode != 200 { + colErr := cm.tryParseErrorMessage(&req, resp) + if colErr != nil { + return colErr + } + return makeMgmtBadStatusError("failed to create scope", &req, resp) + } + + err = resp.Body.Close() + if err != nil { + logDebugf("Failed to close socket (%s)", err) + } + + return nil +} + +// DropScope removes a scope. +func (cm *collectionsManagementProviderCore) DropScope(scopeName string, opts *DropScopeOptions) error { + if opts == nil { + opts = &DropScopeOptions{} + } + + start := time.Now() + defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_drop_scope", start) + + path := fmt.Sprintf("/pools/default/buckets/%s/scopes/%s", cm.bucketName, scopeName) + span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_drop_scope", "management") + span.SetAttribute("db.name", cm.bucketName) + span.SetAttribute("db.couchbase.scope", scopeName) + span.SetAttribute("db.operation", "DELETE "+path) + defer span.End() + + req := mgmtRequest{ + Service: ServiceTypeManagement, + Path: path, + Method: "DELETE", + RetryStrategy: opts.RetryStrategy, + UniqueID: uuid.New().String(), + Timeout: opts.Timeout, + parentSpanCtx: span.Context(), + } + + resp, err := cm.mgmtProvider.executeMgmtRequest(opts.Context, req) + if err != nil { + return makeGenericMgmtError(err, &req, resp, "") + } + defer ensureBodyClosed(resp.Body) + + if resp.StatusCode != 200 { + colErr := cm.tryParseErrorMessage(&req, resp) + if colErr != nil { + return colErr + } + return makeMgmtBadStatusError("failed to drop scope", &req, resp) + } + + err = resp.Body.Close() + if err != nil { + logDebugf("Failed to close socket (%s)", err) + } + + return nil +} + +func (cm *collectionsManagementProviderCore) tryParseErrorMessage(req *mgmtRequest, resp *mgmtResponse) error { + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + logDebugf("failed to read http body: %s", err) + return nil + } + + errText := strings.ToLower(string(b)) + + if err := checkForRateLimitError(resp.StatusCode, errText); err != nil { + return makeGenericMgmtError(err, req, resp, string(b)) + } + + if strings.Contains(errText, "not found") && strings.Contains(errText, "collection") { + return makeGenericMgmtError(ErrCollectionNotFound, req, resp, string(b)) + } else if strings.Contains(errText, "not found") && strings.Contains(errText, "scope") { + return makeGenericMgmtError(ErrScopeNotFound, req, resp, string(b)) + } + + if strings.Contains(errText, "already exists") && strings.Contains(errText, "collection") { + return makeGenericMgmtError(ErrCollectionExists, req, resp, string(b)) + } else if strings.Contains(errText, "already exists") && strings.Contains(errText, "scope") { + return makeGenericMgmtError(ErrScopeExists, req, resp, string(b)) + } + + return makeGenericMgmtError(errors.New(errText), req, resp, string(b)) +} + +// These 3 types are temporary. They are necessary for now as the server beta was released with ns_server returning +// a different jsonManifest format to what it will return in the future. +type jsonManifest struct { + UID uint64 `json:"uid"` + Scopes map[string]jsonManifestScope `json:"scopes"` +} + +type jsonManifestScope struct { + UID uint32 `json:"uid"` + Collections map[string]jsonManifestCollection `json:"collections"` +} + +type jsonManifestCollection struct { + UID uint32 `json:"uid"` +} diff --git a/collectionsmgmtprovider_ps.go b/collectionsmgmtprovider_ps.go new file mode 100644 index 0000000..dbdaf57 --- /dev/null +++ b/collectionsmgmtprovider_ps.go @@ -0,0 +1,260 @@ +package gocb + +import ( + "context" + "time" + + "google.golang.org/grpc/status" + + "github.com/couchbase/goprotostellar/genproto/admin_collection_v1" +) + +type collectionsManagementProviderPs struct { + provider admin_collection_v1.CollectionAdminServiceClient + + defaultTimeout time.Duration + bucketName string + tracer RequestTracer + meter *meterWrapper +} + +func (cm *collectionsManagementProviderPs) GetAllScopes(opts *GetAllScopesOptions) ([]ScopeSpec, error) { + start := time.Now() + defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_get_all_scopes", start) + + span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_get_all_scopes", "management") + span.SetAttribute("db.name", cm.bucketName) + span.SetAttribute("db.operation", "ListCollections") + defer span.End() + + req := &admin_collection_v1.ListCollectionsRequest{ + BucketName: cm.bucketName, + } + + timeout := opts.Timeout + if timeout == 0 { + timeout = cm.defaultTimeout + } + ctx := opts.Context + if ctx == nil { + ctx = context.Background() + } + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + resp, err := cm.provider.ListCollections(ctx, req) + if err != nil { + st, ok := status.FromError(err) + if !ok { + return nil, makeGenericMgmtError(err, nil, nil, err.Error()) + } + gocbErr := tryMapPsErrorStatusToGocbError(st, true) + if gocbErr == nil { + gocbErr = err + } + + return nil, makeGenericMgmtError(gocbErr, nil, nil, err.Error()) + } + + var scopes []ScopeSpec + for _, scope := range resp.GetScopes() { + var collections []CollectionSpec + for _, col := range scope.Collections { + collections = append(collections, CollectionSpec{ + Name: col.Name, + ScopeName: scope.Name, + MaxExpiry: time.Duration(col.GetMaxExpirySecs()) * time.Second, + }) + } + scopes = append(scopes, ScopeSpec{ + Name: scope.Name, + Collections: collections, + }) + } + + return scopes, nil +} + +// CreateCollection creates a new collection on the bucket. +func (cm *collectionsManagementProviderPs) CreateCollection(spec CollectionSpec, opts *CreateCollectionOptions) error { + start := time.Now() + defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_create_collection", start) + + span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_create_collection", "management") + span.SetAttribute("db.name", cm.bucketName) + span.SetAttribute("db.couchbase.scope", spec.ScopeName) + span.SetAttribute("db.couchbase.collection", spec.Name) + span.SetAttribute("db.operation", "CreateCollection") + defer span.End() + + req := &admin_collection_v1.CreateCollectionRequest{ + BucketName: cm.bucketName, + ScopeName: spec.ScopeName, + CollectionName: spec.Name, + } + if spec.MaxExpiry > 0 { + expiry := uint32(spec.MaxExpiry.Seconds()) + req.MaxExpirySecs = &expiry + } + + timeout := opts.Timeout + if timeout == 0 { + timeout = cm.defaultTimeout + } + ctx := opts.Context + if ctx == nil { + ctx = context.Background() + } + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + _, err := cm.provider.CreateCollection(ctx, req) + if err != nil { + st, ok := status.FromError(err) + if !ok { + return makeGenericMgmtError(err, nil, nil, err.Error()) + } + gocbErr := tryMapPsErrorStatusToGocbError(st, false) + if gocbErr == nil { + gocbErr = err + } + + return makeGenericMgmtError(gocbErr, nil, nil, err.Error()) + } + + return nil +} + +// DropCollection removes a collection. +func (cm *collectionsManagementProviderPs) DropCollection(spec CollectionSpec, opts *DropCollectionOptions) error { + start := time.Now() + defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_drop_collection", start) + + span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_drop_collection", "management") + span.SetAttribute("db.name", cm.bucketName) + span.SetAttribute("db.couchbase.scope", spec.ScopeName) + span.SetAttribute("db.couchbase.collection", spec.Name) + span.SetAttribute("db.operation", "DeleteCollection") + defer span.End() + + req := &admin_collection_v1.DeleteCollectionRequest{ + BucketName: cm.bucketName, + ScopeName: spec.ScopeName, + CollectionName: spec.Name, + } + + timeout := opts.Timeout + if timeout == 0 { + timeout = cm.defaultTimeout + } + ctx := opts.Context + if ctx == nil { + ctx = context.Background() + } + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + _, err := cm.provider.DeleteCollection(ctx, req) + if err != nil { + st, ok := status.FromError(err) + if !ok { + return makeGenericMgmtError(err, nil, nil, err.Error()) + } + gocbErr := tryMapPsErrorStatusToGocbError(st, false) + if gocbErr == nil { + gocbErr = err + } + + return makeGenericMgmtError(gocbErr, nil, nil, err.Error()) + } + + return nil +} + +// CreateScope creates a new scope on the bucket. +func (cm *collectionsManagementProviderPs) CreateScope(scopeName string, opts *CreateScopeOptions) error { + start := time.Now() + defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_create_scope", start) + + span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_create_scope", "management") + span.SetAttribute("db.name", cm.bucketName) + span.SetAttribute("db.couchbase.scope", scopeName) + span.SetAttribute("db.operation", "CreateScope") + defer span.End() + + req := &admin_collection_v1.CreateScopeRequest{ + BucketName: cm.bucketName, + ScopeName: scopeName, + } + + timeout := opts.Timeout + if timeout == 0 { + timeout = cm.defaultTimeout + } + ctx := opts.Context + if ctx == nil { + ctx = context.Background() + } + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + _, err := cm.provider.CreateScope(ctx, req) + if err != nil { + st, ok := status.FromError(err) + if !ok { + return makeGenericMgmtError(err, nil, nil, err.Error()) + } + gocbErr := tryMapPsErrorStatusToGocbError(st, false) + if gocbErr == nil { + gocbErr = err + } + + return makeGenericMgmtError(gocbErr, nil, nil, err.Error()) + } + + return nil +} + +// DropScope removes a scope. +func (cm *collectionsManagementProviderPs) DropScope(scopeName string, opts *DropScopeOptions) error { + start := time.Now() + defer cm.meter.ValueRecord(meterValueServiceManagement, "manager_collections_drop_scope", start) + + span := createSpan(cm.tracer, opts.ParentSpan, "manager_collections_drop_scope", "management") + span.SetAttribute("db.name", cm.bucketName) + span.SetAttribute("db.couchbase.scope", scopeName) + span.SetAttribute("db.operation", "DeleteScope") + defer span.End() + + req := &admin_collection_v1.DeleteScopeRequest{ + BucketName: cm.bucketName, + ScopeName: scopeName, + } + + timeout := opts.Timeout + if timeout == 0 { + timeout = cm.defaultTimeout + } + ctx := opts.Context + if ctx == nil { + ctx = context.Background() + } + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + _, err := cm.provider.DeleteScope(ctx, req) + if err != nil { + st, ok := status.FromError(err) + if !ok { + return makeGenericMgmtError(err, nil, nil, err.Error()) + } + gocbErr := tryMapPsErrorStatusToGocbError(st, false) + if gocbErr == nil { + gocbErr = err + } + + return makeGenericMgmtError(gocbErr, nil, nil, err.Error()) + } + + return nil +} diff --git a/mgmt_http.go b/mgmt_http.go index 11745de..4907cf3 100644 --- a/mgmt_http.go +++ b/mgmt_http.go @@ -35,6 +35,50 @@ type mgmtProvider interface { executeMgmtRequest(ctx context.Context, req mgmtRequest) (*mgmtResponse, error) } +type mgmtProviderCore struct { + provider httpProvider + mgmtTimeout time.Duration + retryStrategyWrapper *retryStrategyWrapper +} + +func (mpc *mgmtProviderCore) executeMgmtRequest(ctx context.Context, req mgmtRequest) (mgmtRespOut *mgmtResponse, errOut error) { + timeout := req.Timeout + if timeout == 0 { + timeout = mpc.mgmtTimeout + } + + retryStrategy := mpc.retryStrategyWrapper + if req.RetryStrategy != nil { + retryStrategy = newRetryStrategyWrapper(req.RetryStrategy) + } + + corereq := &gocbcore.HTTPRequest{ + Service: gocbcore.ServiceType(req.Service), + Method: req.Method, + Path: req.Path, + Body: req.Body, + Headers: req.Headers, + ContentType: req.ContentType, + IsIdempotent: req.IsIdempotent, + UniqueID: req.UniqueID, + Deadline: time.Now().Add(timeout), + RetryStrategy: retryStrategy, + TraceContext: req.parentSpanCtx, + } + + coreresp, err := mpc.provider.DoHTTPRequest(ctx, corereq) + if err != nil { + return nil, makeGenericHTTPError(err, corereq, coreresp) + } + + resp := &mgmtResponse{ + Endpoint: coreresp.Endpoint, + StatusCode: uint32(coreresp.StatusCode), + Body: coreresp.Body, + } + return resp, nil +} + func (c *Cluster) executeMgmtRequest(ctx context.Context, req mgmtRequest) (mgmtRespOut *mgmtResponse, errOut error) { timeout := req.Timeout if timeout == 0 { diff --git a/mock_connectionManager_test.go b/mock_connectionManager_test.go index 48672dd..f60a03f 100644 --- a/mock_connectionManager_test.go +++ b/mock_connectionManager_test.go @@ -106,6 +106,32 @@ func (_m *mockConnectionManager) getAnalyticsProvider() (analyticsProvider, erro return r0, r1 } +// getCollectionsManagementProvider provides a mock function with given fields: bucketName +func (_m *mockConnectionManager) getCollectionsManagementProvider(bucketName string) (collectionsManagementProvider, error) { + ret := _m.Called(bucketName) + + var r0 collectionsManagementProvider + var r1 error + if rf, ok := ret.Get(0).(func(string) (collectionsManagementProvider, error)); ok { + return rf(bucketName) + } + if rf, ok := ret.Get(0).(func(string) collectionsManagementProvider); ok { + r0 = rf(bucketName) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(collectionsManagementProvider) + } + } + + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(bucketName) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // getDiagnosticsProvider provides a mock function with given fields: bucketName func (_m *mockConnectionManager) getDiagnosticsProvider(bucketName string) (diagnosticsProvider, error) { ret := _m.Called(bucketName)