Skip to content

Commit

Permalink
GOCBC-59: Add support for AT_PLUS N1QL querying.
Browse files Browse the repository at this point in the history
Change-Id: Ic3ad6f54604e3c18390527579a2d1bb521cd8162
Reviewed-on: http://review.couchbase.org/63856
Reviewed-by: Mark Nunberg <mark.nunberg@couchbase.com>
Tested-by: Brett Lawson <brett19@gmail.com>
  • Loading branch information
brett19 committed May 17, 2016
1 parent b1c6616 commit c96d044
Show file tree
Hide file tree
Showing 19 changed files with 278 additions and 63 deletions.
10 changes: 6 additions & 4 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (

// An interface representing a single bucket within a cluster.
type Bucket struct {
cluster *Cluster
name string
password string
client *gocbcore.Agent
cluster *Cluster
name string
password string
client *gocbcore.Agent
mtEnabled bool

transcoder Transcoder
opTimeout time.Duration
Expand All @@ -34,6 +35,7 @@ func createBucket(cluster *Cluster, config *gocbcore.AgentConfig) (*Bucket, erro
name: config.BucketName,
password: config.Password,
client: cli,
mtEnabled: config.UseMutationTokens,
transcoder: &DefaultTranscoder{},

opTimeout: 2500 * time.Millisecond,
Expand Down
6 changes: 2 additions & 4 deletions bucket_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"github.com/couchbase/gocb/gocbcore"
)

type MutationToken gocbcore.MutationToken

// Retrieves a document from the bucket
func (b *Bucket) Get(key string, valuePtr interface{}) (Cas, error) {
return b.get(key, valuePtr)
Expand Down Expand Up @@ -158,7 +156,7 @@ func (b *Bucket) hlpCasExec(execFn hlpCasHandler) (casOut Cas, mtOut MutationTok
errOut = err
if errOut == nil {
casOut = Cas(cas)
mtOut = MutationToken(mt)
mtOut = MutationToken{mt, b}
}
signal <- true
})
Expand Down Expand Up @@ -190,7 +188,7 @@ func (b *Bucket) hlpCtrExec(execFn hlpCtrHandler) (valOut uint64, casOut Cas, mt
if errOut == nil {
valOut = value
casOut = Cas(cas)
mtOut = MutationToken(mt)
mtOut = MutationToken{mt, b}
}
signal <- true
})
Expand Down
8 changes: 4 additions & 4 deletions bucket_dura.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ func (b *Bucket) observeOnceCas(key []byte, cas Cas, forDelete bool, repId int,
}

func (b *Bucket) observeOnceSeqNo(key []byte, mt MutationToken, repId int, commCh chan uint) (pendingOp, error) {
return b.client.ObserveSeqNo(key, mt.VbUuid, repId,
return b.client.ObserveSeqNo(key, mt.token.VbUuid, repId,
func(currentSeqNo gocbcore.SeqNo, persistSeqNo gocbcore.SeqNo, err error) {
if err != nil {
commCh <- 0
return
}

didReplicate := currentSeqNo >= mt.SeqNo
didPersist := persistSeqNo >= mt.SeqNo
didReplicate := currentSeqNo >= mt.token.SeqNo
didPersist := persistSeqNo >= mt.token.SeqNo

var out uint
if didReplicate {
Expand All @@ -78,7 +78,7 @@ func (b *Bucket) observeOnceSeqNo(key []byte, mt MutationToken, repId int, commC

func (b *Bucket) observeOne(key []byte, mt MutationToken, cas Cas, forDelete bool, repId int, replicaCh, persistCh chan bool) {
observeOnce := func(commCh chan uint) (pendingOp, error) {
if mt.VbUuid != 0 && mt.SeqNo != 0 {
if mt.token.VbUuid != 0 && mt.token.SeqNo != 0 {
return b.observeOnceSeqNo(key, mt, repId, commCh)
} else {
return b.observeOnceCas(key, cas, forDelete, repId, commCh)
Expand Down
2 changes: 1 addition & 1 deletion bucket_subdoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (b *Bucket) mutateIn(set *MutateInBuilder) (resOut *DocumentFragment, errOu
if errOut == nil {
resSet := &DocumentFragment{
cas: Cas(cas),
mt: MutationToken(mt),
mt: MutationToken{mt, b},
}
resSet.contents = make([]subDocResult, len(results))

Expand Down
57 changes: 57 additions & 0 deletions bucket_token.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package gocb

// Performs a Remove operation and includes MutationToken in the results.
func (b *Bucket) RemoveMt(key string, cas Cas) (Cas, MutationToken, error) {
if !b.mtEnabled {
panic("You must use OpenBucketMt with Mt operation variants.")
}
return b.remove(key, cas)
}

// Performs a Upsert operation and includes MutationToken in the results.
func (b *Bucket) UpsertMt(key string, value interface{}, expiry uint32) (Cas, MutationToken, error) {
if !b.mtEnabled {
panic("You must use OpenBucketMt with Mt operation variants.")
}
return b.upsert(key, value, expiry)
}

// Performs a Insert operation and includes MutationToken in the results.
func (b *Bucket) InsertMt(key string, value interface{}, expiry uint32) (Cas, MutationToken, error) {
if !b.mtEnabled {
panic("You must use OpenBucketMt with Mt operation variants.")
}
return b.insert(key, value, expiry)
}

// Performs a Replace operation and includes MutationToken in the results.
func (b *Bucket) ReplaceMt(key string, value interface{}, cas Cas, expiry uint32) (Cas, MutationToken, error) {
if !b.mtEnabled {
panic("You must use OpenBucketMt with Mt operation variants.")
}
return b.replace(key, value, cas, expiry)
}

// Performs a Append operation and includes MutationToken in the results.
func (b *Bucket) AppendMt(key, value string) (Cas, MutationToken, error) {
if !b.mtEnabled {
panic("You must use OpenBucketMt with Mt operation variants.")
}
return b.append(key, value)
}

// Performs a Prepend operation and includes MutationToken in the results.
func (b *Bucket) PrependMt(key, value string) (Cas, MutationToken, error) {
if !b.mtEnabled {
panic("You must use OpenBucketMt with Mt operation variants.")
}
return b.prepend(key, value)
}

// Performs a Counter operation and includes MutationToken in the results.
func (b *Bucket) CounterMt(key string, delta, initial int64, expiry uint32) (uint64, Cas, MutationToken, error) {
if !b.mtEnabled {
panic("You must use OpenBucketMt with Mt operation variants.")
}
return b.counter(key, delta, initial, expiry)
}
18 changes: 13 additions & 5 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func specToHosts(spec connSpec) ([]string, []string, bool) {
return memdHosts, httpHosts, spec.Scheme.IsSSL()
}

func (c *Cluster) makeAgentConfig(bucket, password string) (*gocbcore.AgentConfig, error) {
func (c *Cluster) makeAgentConfig(bucket, password string, mt bool) (*gocbcore.AgentConfig, error) {
authFn := func(srv gocbcore.AuthClient, deadline time.Time) error {
// Build PLAIN auth data
userBuf := []byte(bucket)
Expand Down Expand Up @@ -143,7 +143,7 @@ func (c *Cluster) makeAgentConfig(bucket, password string) (*gocbcore.AgentConfi
BucketName: bucket,
Password: password,
AuthHandler: authFn,
UseMutationTokens: false,
UseMutationTokens: mt,
ConnectTimeout: c.connectTimeout,
ServerConnectTimeout: c.serverConnectTimeout,
}, nil
Expand All @@ -154,14 +154,14 @@ func (c *Cluster) Authenticate(auth Authenticator) error {
return nil
}

func (c *Cluster) OpenBucket(bucket, password string) (*Bucket, error) {
func (c *Cluster) openBucket(bucket, password string, mt bool) (*Bucket, error) {
if password == "" {
if c.auth != nil {
password = c.auth.bucketMemd(bucket)
}
}

agentConfig, err := c.makeAgentConfig(bucket, password)
agentConfig, err := c.makeAgentConfig(bucket, password, mt)
if err != nil {
return nil, err
}
Expand All @@ -178,6 +178,14 @@ func (c *Cluster) OpenBucket(bucket, password string) (*Bucket, error) {
return b, nil
}

func (c *Cluster) OpenBucket(bucket, password string) (*Bucket, error) {
return c.openBucket(bucket, password, false)
}

func (c *Cluster) OpenBucketWithMt(bucket, password string) (*Bucket, error) {
return c.openBucket(bucket, password, true)
}

func (c *Cluster) closeBucket(bucket *Bucket) {
c.clusterLock.Lock()
for i, e := range c.bucketList {
Expand Down Expand Up @@ -252,7 +260,7 @@ func (b *StreamingBucket) IoRouter() *gocbcore.Agent {
}

func (c *Cluster) OpenStreamingBucket(streamName, bucket, password string) (*StreamingBucket, error) {
agentConfig, err := c.makeAgentConfig(bucket, password)
agentConfig, err := c.makeAgentConfig(bucket, password, false)
if err != nil {
return nil, err
}
Expand Down
9 changes: 6 additions & 3 deletions gocbcore/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,12 @@ func (agent *Agent) Close() {
// requests which are not pending on a server queue.
if routingInfo.deadQueue != nil {
routingInfo.deadQueue.Drain(func(req *memdQRequest) {
req.Callback(nil, ErrShutdown)
req.Callback(nil, nil, ErrShutdown)
}, nil)
}
if routingInfo.waitQueue != nil {
routingInfo.waitQueue.Drain(func(req *memdQRequest) {
req.Callback(nil, ErrShutdown)
req.Callback(nil, nil, ErrShutdown)
}, nil)
}

Expand All @@ -306,7 +306,7 @@ func (agent *Agent) Close() {
for range routingInfo.servers {
s := <-agent.shutdownWaitCh
s.Drain(func(req *memdQRequest) {
req.Callback(nil, ErrShutdown)
req.Callback(nil, nil, ErrShutdown)
})
}
}
Expand All @@ -318,6 +318,9 @@ func (c *Agent) IsSecure() bool {

// Translates a particular key to its assigned vbucket.
func (c *Agent) KeyToVbucket(key []byte) uint16 {
if c.NumVbuckets() <= 0 {
return 0xFFFF
}
return uint16(cbCrc(key) % uint32(c.NumVbuckets()))
}

Expand Down
1 change: 1 addition & 0 deletions gocbcore/agentops.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type SeqNo uint64

// Represents a particular mutation within the cluster.
type MutationToken struct {
VbId uint16
VbUuid VbUuid
SeqNo SeqNo
}
Expand Down
Loading

0 comments on commit c96d044

Please sign in to comment.