Skip to content

Commit

Permalink
GOCBC-90: Implement cluster level authentication.
Browse files Browse the repository at this point in the history
Change-Id: I6a829294b4bdedf9cecdb14aed97b3bc42dfa8aa
Reviewed-on: http://review.couchbase.org/61667
Reviewed-by: Mark Nunberg <mark.nunberg@couchbase.com>
Tested-by: Brett Lawson <brett19@gmail.com>
  • Loading branch information
brett19 committed Apr 8, 2016
1 parent 5bb7ed3 commit aabbf83
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 85 deletions.
67 changes: 67 additions & 0 deletions auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package gocb

type Authenticator interface {
clusterMgmt() userPassPair
clusterN1ql() []userPassPair
bucketMemd(bucket string) string
bucketMgmt(bucket string) userPassPair
bucketViews(bucket string) userPassPair
bucketN1ql(bucket string) []userPassPair
}

type BucketAuthenticator struct {
Password string
}

type userPassPair struct {
Username string `json:"user"`
Password string `json:"pass"`
}

type BucketAuthenticatorMap map[string]BucketAuthenticator

type ClusterAuthenticator struct {
Buckets BucketAuthenticatorMap
Username string
Password string
}

func (ca ClusterAuthenticator) clusterMgmt() userPassPair {
return userPassPair{ca.Username, ca.Password}
}

func (ca ClusterAuthenticator) clusterN1ql() []userPassPair {
userPassList := make([]userPassPair, 0)
for bucket, auth := range ca.Buckets {
userPassList = append(userPassList, userPassPair{
Username: bucket,
Password: auth.Password,
})
}
return userPassList
}

func (ca ClusterAuthenticator) bucketAll(bucket string) userPassPair {
if bucketAuth, ok := ca.Buckets[bucket]; ok {
return userPassPair{bucket, bucketAuth.Password}
}
return userPassPair{"", ""}
}

func (ca ClusterAuthenticator) bucketMemd(bucket string) string {
return ca.bucketAll(bucket).Password
}

func (ca ClusterAuthenticator) bucketMgmt(bucket string) userPassPair {
return ca.bucketAll(bucket)
}

func (ca ClusterAuthenticator) bucketViews(bucket string) userPassPair {
return ca.bucketAll(bucket)
}

func (ca ClusterAuthenticator) bucketN1ql(bucket string) []userPassPair {
return []userPassPair{
ca.bucketAll(bucket),
}
}
15 changes: 12 additions & 3 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

// An interface representing a single bucket within a cluster.
type Bucket struct {
cluster *Cluster
name string
password string
client *gocbcore.Agent
Expand All @@ -26,13 +27,14 @@ type Bucket struct {
internal *bucketInternal
}

func createBucket(config *gocbcore.AgentConfig) (*Bucket, error) {
func createBucket(cluster *Cluster, config *gocbcore.AgentConfig) (*Bucket, error) {
cli, err := gocbcore.CreateAgent(config)
if err != nil {
return nil, err
}

bucket := &Bucket{
cluster: cluster,
name: config.BucketName,
password: config.Password,
client: cli,
Expand Down Expand Up @@ -135,9 +137,16 @@ func (b *Bucket) Internal() *bucketInternal {
}

func (b *Bucket) Manager(username, password string) *BucketManager {
userPass := userPassPair{username, password}
if username == "" || password == "" {
if b.cluster.auth != nil {
userPass = b.cluster.auth.bucketMgmt(b.name)
}
}

return &BucketManager{
bucket: b,
username: username,
password: password,
username: userPass.Username,
password: userPass.Password,
}
}
112 changes: 33 additions & 79 deletions bucket_http.go → bucket_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,10 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"
)

type n1qlCache struct {
name string
encodedPlan string
}

type viewResponse struct {
TotalRows int `json:"total_rows,omitempty"`
Rows []json.RawMessage `json:"rows,omitempty"`
Expand Down Expand Up @@ -77,20 +73,25 @@ func (r *viewResults) One(valuePtr interface{}) error {
return nil
}

// Performs a view query and returns a list of rows or an error.
func (b *Bucket) ExecuteViewQuery(q *ViewQuery) (ViewResults, error) {
func (b *Bucket) executeViewQuery(viewType, ddoc, viewName string, options url.Values) (ViewResults, error) {
capiEp, err := b.getViewEp()
if err != nil {
return nil, err
}

reqUri := fmt.Sprintf("%s/_design/%s/_view/%s?%s", capiEp, q.ddoc, q.name, q.options.Encode())
reqUri := fmt.Sprintf("%s/_design/%s/%s/%s?%s", capiEp, ddoc, viewType, viewName, options.Encode())

req, err := http.NewRequest("GET", reqUri, nil)
if err != nil {
return nil, err
}
req.SetBasicAuth(b.name, b.password)

if b.cluster.auth != nil {
userPass := b.cluster.auth.bucketViews(b.name)
req.SetBasicAuth(userPass.Username, userPass.Password)
} else {
req.SetBasicAuth(b.name, b.password)
}

resp, err := doHttpWithTimeout(b.client.HttpClient(), req, b.viewTimeout)
if err != nil {
Expand Down Expand Up @@ -123,50 +124,19 @@ func (b *Bucket) ExecuteViewQuery(q *ViewQuery) (ViewResults, error) {
}, nil
}

// Performs a view query and returns a list of rows or an error.
func (b *Bucket) ExecuteViewQuery(q *ViewQuery) (ViewResults, error) {
return b.executeViewQuery("_view", q.ddoc, q.name, q.options)
}

// Performs a spatial query and returns a list of rows or an error.
func (b *Bucket) ExecuteSpatialQuery(q *SpatialQuery) (ViewResults, error) {
capiEp, err := b.getViewEp()
if err != nil {
return nil, err
}

reqUri := fmt.Sprintf("%s/_design/%s/_spatial/%s?%s", capiEp, q.ddoc, q.name, q.options.Encode())

req, err := http.NewRequest("GET", reqUri, nil)
if err != nil {
return nil, err
}
req.SetBasicAuth(b.name, b.password)

resp, err := doHttpWithTimeout(b.client.HttpClient(), req, b.viewTimeout)
if err != nil {
return nil, err
}

viewResp := viewResponse{}
jsonDec := json.NewDecoder(resp.Body)
jsonDec.Decode(&viewResp)

resp.Body.Close()

if resp.StatusCode != 200 {
if viewResp.Error != "" {
return nil, &viewError{
Message: viewResp.Error,
Reason: viewResp.Reason,
}
}

return nil, &viewError{
Message: "HTTP Error",
Reason: fmt.Sprintf("Status code was %d.", resp.StatusCode),
}
}
return b.executeViewQuery("_spatial", q.ddoc, q.name, q.options)
}

return &viewResults{
index: -1,
rows: viewResp.Rows,
}, nil
type n1qlCache struct {
name string
encodedPlan string
}

type n1qlError struct {
Expand Down Expand Up @@ -243,28 +213,6 @@ func (r *n1qlResults) One(valuePtr interface{}) error {
return nil
}

// Wrapper around net.http.Client.Do().
// This allows a per-request timeout without setting the timeout on the Client object
// directly.
// The third parameter is the duration for the request itself.
func doHttpWithTimeout(cli *http.Client, req *http.Request, timeout time.Duration) (resp *http.Response, err error) {
if timeout.Seconds() == 0 {
// No timeout
resp, err = cli.Do(req)
return
}

tmoch := make(chan struct{})
timer := time.AfterFunc(timeout, func() {
tmoch <- struct{}{}
})

req.Cancel = tmoch
resp, err = cli.Do(req)
timer.Stop()
return
}

// Executes the N1QL query (in opts) on the server n1qlEp.
// This function assumes that `opts` already contains all the required
// settings. This function will inject any additional connection or request-level
Expand All @@ -286,12 +234,20 @@ func (b *Bucket) executeN1qlQuery(n1qlEp string, opts map[string]interface{}) (V
opts["timeout"] = timeout.String()
}

if b.cluster.auth != nil {
userPasses := b.cluster.auth.bucketN1ql(b.name)
opts["creds"] = userPasses
} else {
opts["creds"] = []userPassPair{
userPassPair{b.name, b.password},
}
}

req, err := http.NewRequest("POST", reqUri, bytes.NewBuffer(reqJson))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(b.name, b.password)

resp, err := doHttpWithTimeout(b.client.HttpClient(), req, timeout)
if err != nil {
Expand Down Expand Up @@ -333,7 +289,10 @@ func (b *Bucket) prepareN1qlQuery(n1qlEp string, opts map[string]interface{}) (*
return nil, err
}

var preped n1qlPrepData
var preped struct {
EncodedPlan string `json:"encoded_plan"`
Name string `json:"name"`
}
err = prepRes.One(&preped)
if err != nil {
return nil, err
Expand All @@ -345,11 +304,6 @@ func (b *Bucket) prepareN1qlQuery(n1qlEp string, opts map[string]interface{}) (*
}, nil
}

type n1qlPrepData struct {
EncodedPlan string `json:"encoded_plan"`
Name string `json:"name"`
}

// Performs a spatial query and returns a list of rows or an error.
func (b *Bucket) ExecuteN1qlQuery(q *N1qlQuery, params interface{}) (ViewResults, error) {
n1qlEp, err := b.getN1qlEp()
Expand Down
25 changes: 22 additions & 3 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

type Cluster struct {
spec connSpec
auth Authenticator
connectTimeout time.Duration
serverConnectTimeout time.Duration
}
Expand Down Expand Up @@ -120,12 +121,30 @@ func (c *Cluster) makeAgentConfig(bucket, password string) *gocbcore.AgentConfig
}
}

func (c *Cluster) Authenticate(auth Authenticator) error {
c.auth = auth
return nil
}

func (c *Cluster) OpenBucket(bucket, password string) (*Bucket, error) {
if password == "" {
if c.auth != nil {
password = c.auth.bucketMemd(bucket)
}
}

agentConfig := c.makeAgentConfig(bucket, password)
return createBucket(agentConfig)
return createBucket(c, agentConfig)
}

func (c *Cluster) Manager(username, password string) *ClusterManager {
userPass := userPassPair{username, password}
if username == "" || password == "" {
if c.auth != nil {
userPass = c.auth.clusterMgmt()
}
}

_, httpHosts, isSslHosts := specToHosts(c.spec)
var mgmtHosts []string

Expand All @@ -146,8 +165,8 @@ func (c *Cluster) Manager(username, password string) *ClusterManager {

return &ClusterManager{
hosts: mgmtHosts,
username: username,
password: password,
username: userPass.Username,
password: userPass.Password,
httpCli: &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
Expand Down
28 changes: 28 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package gocb

import (
"net/http"
"time"
)

// Wrapper around net.http.Client.Do().
// This allows a per-request timeout without setting the timeout on the Client object
// directly.
// The third parameter is the duration for the request itself.
func doHttpWithTimeout(cli *http.Client, req *http.Request, timeout time.Duration) (resp *http.Response, err error) {
if timeout.Seconds() == 0 {
// No timeout
resp, err = cli.Do(req)
return
}

tmoch := make(chan struct{})
timer := time.AfterFunc(timeout, func() {
tmoch <- struct{}{}
})

req.Cancel = tmoch
resp, err = cli.Do(req)
timer.Stop()
return
}

0 comments on commit aabbf83

Please sign in to comment.