diff --git a/bucket.go b/bucket.go index 68cccfaf..dbb8c5a1 100644 --- a/bucket.go +++ b/bucket.go @@ -259,6 +259,11 @@ func (b *Bucket) getViewEp() string { return capiEps[rand.Intn(len(capiEps))] } +func (b *Bucket) getMgmtEp() string { + mgmtEps := b.client.MgmtEps() + return mgmtEps[rand.Intn(len(mgmtEps))] +} + type viewItem struct { Bytes []byte } @@ -381,3 +386,11 @@ func (b *Bucket) ExecuteViewQuery(q *ViewQuery) ViewResults { func (b *Bucket) IoRouter() *gocbcore.Agent { return b.client } + +func (b *Bucket) Manager(username, password string) *BucketManager { + return &BucketManager{ + bucket: b, + username: username, + password: password, + } +} diff --git a/bucketmgr.go b/bucketmgr.go new file mode 100644 index 00000000..60b8ece2 --- /dev/null +++ b/bucketmgr.go @@ -0,0 +1,209 @@ +package gocb + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" +) + +type View struct { + Map string `json:"map,omitempty"` + Reduce string `json:"reduce,omitempty"` +} + +func (v View) hasReduce() bool { + return v.Reduce != "" +} + +type DesignDocument struct { + Name string `json:"-"` + Views map[string]View `json:"views,omitempty"` + SpatialViews map[string]View `json:"spatial,omitempty"` +} + +type BucketManager struct { + bucket *Bucket + username string + password string +} + +func (bm *BucketManager) capiRequest(method, uri, contentType string, body io.Reader) (*http.Response, error) { + if contentType == "" && body != nil { + panic("Content-type must be specified for non-null body.") + } + + reqUri := bm.bucket.getViewEp() + uri + req, err := http.NewRequest(method, reqUri, body) + if contentType != "" { + req.Header.Add("Content-Type", contentType) + } + if err != nil { + return nil, err + } + + req.SetBasicAuth(bm.username, bm.password) + return bm.bucket.httpCli.Do(req) +} + +func (bm *BucketManager) mgmtRequest(method, uri, contentType string, body io.Reader) (*http.Response, error) { + if contentType == "" && body != nil { + panic("Content-type must be specified for non-null body.") + } + + reqUri := bm.bucket.getMgmtEp() + uri + req, err := http.NewRequest(method, reqUri, body) + if contentType != "" { + req.Header.Add("Content-Type", contentType) + } + if err != nil { + return nil, err + } + + req.SetBasicAuth(bm.username, bm.password) + return bm.bucket.httpCli.Do(req) +} + +func (bm *BucketManager) Flush() error { + reqUri := fmt.Sprintf("/pools/default/%s/controller/doFlush", bm.bucket.name) + resp, err := bm.mgmtRequest("POST", reqUri, "", nil) + if err != nil { + return err + } + + if resp.StatusCode != 200 { + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + resp.Body.Close() + return clientError{string(data)} + } + return nil +} + +func (bm *BucketManager) GetDesignDocument(name string) (*DesignDocument, error) { + reqUri := fmt.Sprintf("/_design/%s", name) + + resp, err := bm.capiRequest("GET", reqUri, "", nil) + if err != nil { + return nil, err + } + + if resp.StatusCode != 200 { + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + resp.Body.Close() + return nil, clientError{string(data)} + } + + ddocObj := DesignDocument{} + jsonDec := json.NewDecoder(resp.Body) + err = jsonDec.Decode(&ddocObj) + if err != nil { + return nil, err + } + + ddocObj.Name = name + return &ddocObj, nil +} + +func (bm *BucketManager) GetDesignDocuments() ([]*DesignDocument, error) { + reqUri := fmt.Sprintf("/pools/default/buckets/%s/ddocs", bm.bucket.name) + + resp, err := bm.mgmtRequest("GET", reqUri, "", nil) + if err != nil { + return nil, err + } + + if resp.StatusCode != 200 { + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + resp.Body.Close() + return nil, clientError{string(data)} + } + + var ddocsObj struct { + Rows []struct { + Doc struct { + Meta struct { + Id string + } + Json DesignDocument + } + } + } + jsonDec := json.NewDecoder(resp.Body) + err = jsonDec.Decode(&ddocsObj) + if err != nil { + return nil, err + } + + var ddocs []*DesignDocument + for _, ddocData := range ddocsObj.Rows { + ddoc := &ddocData.Doc.Json + ddoc.Name = ddocData.Doc.Meta.Id[8:] + ddocs = append(ddocs, ddoc) + } + + return ddocs, nil +} + +func (bm *BucketManager) InsertDesignDocument(ddoc *DesignDocument) error { + oldDdoc, _ := bm.GetDesignDocument(ddoc.Name) + if oldDdoc != nil { + return clientError{"Design document already exists"} + } + return bm.UpsertDesignDocument(ddoc) +} + +func (bm *BucketManager) UpsertDesignDocument(ddoc *DesignDocument) error { + reqUri := fmt.Sprintf("/_design/%s", ddoc.Name) + + data, err := json.Marshal(&ddoc) + if err != nil { + return err + } + + resp, err := bm.capiRequest("PUT", reqUri, "application/json", bytes.NewReader(data)) + if err != nil { + return err + } + + if resp.StatusCode != 201 { + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + resp.Body.Close() + return clientError{string(data)} + } + + return nil +} + +func (bm *BucketManager) RemoveDesignDocument(name string) error { + reqUri := fmt.Sprintf("/_design/%s", name) + + resp, err := bm.capiRequest("DELETE", reqUri, "", nil) + if err != nil { + return err + } + + if resp.StatusCode != 200 { + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + resp.Body.Close() + return clientError{string(data)} + } + + return nil +} diff --git a/cluster.go b/cluster.go index bcd638fe..a196af31 100644 --- a/cluster.go +++ b/cluster.go @@ -98,6 +98,32 @@ func (c *Cluster) OpenBucket(bucket, password string) (*Bucket, error) { }, nil } +func (c *Cluster) Manager(username, password string) *ClusterManager { + _, httpHosts, isSslHosts := specToHosts(c.spec) + var mgmtHosts []string + + for _, host := range httpHosts { + if isSslHosts { + mgmtHosts = append(mgmtHosts, "https://"+host) + } else { + mgmtHosts = append(mgmtHosts, "http://"+host) + } + } + + return &ClusterManager{ + hosts: mgmtHosts, + username: username, + password: password, + httpCli: &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + }, + }, + } +} + type StreamingBucket struct { client *gocbcore.Agent } diff --git a/clustermgr.go b/clustermgr.go new file mode 100644 index 00000000..d8c17609 --- /dev/null +++ b/clustermgr.go @@ -0,0 +1,190 @@ +package gocb + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net/http" + "net/url" +) + +type ClusterManager struct { + hosts []string + username string + password string + httpCli *http.Client +} + +type BucketType int + +const ( + Couchbase = BucketType(0) + Memcached = BucketType(1) +) + +type bucketDataIn struct { + Name string `json:"name"` + BucketType string `json:"bucketType"` + AuthType string `json:"authType"` + SaslPassword string `json:"saslPassword"` + Quota struct { + Ram int `json:"ram"` + RawRam int `json:"rawRAM"` + } `json:"quota"` + ReplicaNumber int `json:"replicaNumber"` + ReplicaIndex bool `json:"replicaIndex"` + Controllers struct { + Flush string `json:"flush"` + } `json:"controllers"` +} + +type BucketSettings struct { + FlushEnabled bool + IndexReplicas bool + Name string + Password string + Quota int + Replicas int + Type BucketType +} + +func (cm *ClusterManager) getMgmtEp() string { + return cm.hosts[rand.Intn(len(cm.hosts))] +} + +func (cm *ClusterManager) mgmtRequest(method, uri string, contentType string, body io.Reader) (*http.Response, error) { + if contentType == "" && body != nil { + panic("Content-type must be specified for non-null body.") + } + + reqUri := cm.getMgmtEp() + uri + req, err := http.NewRequest(method, reqUri, body) + if contentType != "" { + req.Header.Add("Content-Type", contentType) + } + if err != nil { + return nil, err + } + + req.SetBasicAuth(cm.username, cm.password) + return cm.httpCli.Do(req) +} + +func bucketDataInToSettings(bucketData *bucketDataIn) *BucketSettings { + settings := &BucketSettings{ + FlushEnabled: bucketData.Controllers.Flush != "", + IndexReplicas: bucketData.ReplicaIndex, + Name: bucketData.Name, + Password: bucketData.SaslPassword, + Quota: bucketData.Quota.Ram, + Replicas: bucketData.ReplicaNumber, + } + if bucketData.BucketType == "couchbase" { + settings.Type = Couchbase + } else if bucketData.BucketType == "memcached" { + settings.Type = Memcached + } else { + panic("Unrecognized bucket type string.") + } + if bucketData.AuthType != "sasl" { + settings.Password = "" + } + return settings +} + +func (cm *ClusterManager) GetBuckets() ([]*BucketSettings, error) { + resp, err := cm.mgmtRequest("GET", "/pools/default/buckets", "", nil) + if err != nil { + return nil, err + } + + if resp.StatusCode != 200 { + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + resp.Body.Close() + return nil, clientError{string(data)} + } + + var bucketsData []*bucketDataIn + jsonDec := json.NewDecoder(resp.Body) + err = jsonDec.Decode(&bucketsData) + if err != nil { + return nil, err + } + + var buckets []*BucketSettings + for _, bucketData := range bucketsData { + buckets = append(buckets, bucketDataInToSettings(bucketData)) + } + + return buckets, nil +} + +func (cm *ClusterManager) InsertBucket(settings *BucketSettings) error { + posts := url.Values{} + posts.Add("name", settings.Name) + if settings.Type == Couchbase { + posts.Add("bucketType", "couchbase") + } else if settings.Type == Memcached { + posts.Add("bucketType", "memcached") + } else { + panic("Unrecognized bucket type.") + } + if settings.FlushEnabled { + posts.Add("flushEnabled", "1") + } else { + posts.Add("flushEnabled", "0") + } + posts.Add("replicaNumber", fmt.Sprintf("%d", settings.Replicas)) + posts.Add("authType", "sasl") + posts.Add("saslPassword", settings.Password) + posts.Add("ramQuotaMB", fmt.Sprintf("%d", settings.Quota)) + posts.Add("proxyPort", "11210") + + data := []byte(posts.Encode()) + resp, err := cm.mgmtRequest("POST", "/pools/default/buckets", "application/x-www-form-urlencoded", bytes.NewReader(data)) + if err != nil { + return nil + } + + if resp.StatusCode != 202 { + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + resp.Body.Close() + return clientError{string(data)} + } + + return nil +} + +func (cm *ClusterManager) UpdateBucket(settings *BucketSettings) error { + // Cluster-side, updates are the same as creates. + return cm.InsertBucket(settings) +} + +func (cm *ClusterManager) RemoveBucket(name string) error { + reqUri := fmt.Sprintf("/pools/default/buckets/%s", name) + + resp, err := cm.mgmtRequest("DELETE", reqUri, "", nil) + if err != nil { + return err + } + + if resp.StatusCode != 200 { + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + resp.Body.Close() + return clientError{string(data)} + } + + return nil +}