Skip to content

Commit

Permalink
GOCBC-31: Added support for MutationToken's and ObserveSeqNo.
Browse files Browse the repository at this point in the history
Implement HELLO, MutationToken and the ObserveSeqNo operation
 in gocbcore and added MutationToken based durability polling
 to the gocb library.

Change-Id: I732145dcdca123f8a2d57248d027b90b12eda6a8
Reviewed-on: http://review.couchbase.org/54299
Reviewed-by: Brett Lawson <brett19@gmail.com>
Tested-by: Brett Lawson <brett19@gmail.com>
  • Loading branch information
brett19 committed Sep 10, 2015
1 parent bcbfbf8 commit 8b7b554
Show file tree
Hide file tree
Showing 6 changed files with 391 additions and 128 deletions.
155 changes: 112 additions & 43 deletions bucket_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,93 @@ import (
"github.com/couchbaselabs/gocb/gocbcore"
)

type MutationToken gocbcore.MutationToken

// Retrieves a document from the bucket
func (b *Bucket) Get(key string, valuePtr interface{}) (Cas, error) {
cas, _, err := b.get(key, valuePtr)
return cas, err
}

// Retrieves a document and simultaneously updates its expiry time.
func (b *Bucket) GetAndTouch(key string, expiry uint32, valuePtr interface{}) (Cas, error) {
cas, _, err := b.getAndTouch(key, expiry, valuePtr)
return cas, err
}

// Locks a document for a period of time, providing exclusive RW access to it.
func (b *Bucket) GetAndLock(key string, lockTime uint32, valuePtr interface{}) (Cas, error) {
cas, _, err := b.getAndLock(key, lockTime, valuePtr)
return cas, err
}

// Unlocks a document which was locked with GetAndLock.
func (b *Bucket) Unlock(key string, cas Cas) (Cas, error) {
cas, _, err := b.unlock(key, cas)
return cas, err
}

// Returns the value of a particular document from a replica server.
func (b *Bucket) GetReplica(key string, valuePtr interface{}, replicaIdx int) (Cas, error) {
cas, _, err := b.getReplica(key, valuePtr, replicaIdx)
return cas, err
}

// Touches a document, specifying a new expiry time for it.
func (b *Bucket) Touch(key string, cas Cas, expiry uint32) (Cas, error) {
cas, _, err := b.touch(key, cas, expiry)
return cas, err
}

// Removes a document from the bucket.
func (b *Bucket) Remove(key string, cas Cas) (Cas, error) {
cas, _, err := b.remove(key, cas)
return cas, err
}

// Inserts or replaces a document in the bucket.
func (b *Bucket) Upsert(key string, value interface{}, expiry uint32) (Cas, error) {
cas, _, err := b.upsert(key, value, expiry)
return cas, err
}

// Inserts a new document to the bucket.
func (b *Bucket) Insert(key string, value interface{}, expiry uint32) (Cas, error) {
cas, _, err := b.insert(key, value, expiry)
return cas, err
}

// Replaces a document in the bucket.
func (b *Bucket) Replace(key string, value interface{}, cas Cas, expiry uint32) (Cas, error) {
cas, _, err := b.replace(key, value, cas, expiry)
return cas, err
}

// Appends a string value to a document.
func (b *Bucket) Append(key, value string) (Cas, error) {
cas, _, err := b.append(key, value)
return cas, err
}

// Prepends a string value to a document.
func (b *Bucket) Prepend(key, value string) (Cas, error) {
cas, _, err := b.prepend(key, value)
return cas, err
}

// Performs an atomic addition or subtraction for an integer document.
func (b *Bucket) Counter(key string, delta, initial int64, expiry uint32) (uint64, Cas, error) {
val, cas, _, err := b.counter(key, delta, initial, expiry)
return val, cas, err
}

type ioGetCallback func([]byte, uint32, gocbcore.Cas, error)
type ioCasCallback func(gocbcore.Cas, error)
type ioCtrCallback func(uint64, gocbcore.Cas, error)
type ioCasCallback func(gocbcore.Cas, gocbcore.MutationToken, error)
type ioCtrCallback func(uint64, gocbcore.Cas, gocbcore.MutationToken, error)

type hlpGetHandler func(ioGetCallback) (pendingOp, error)

func (b *Bucket) hlpGetExec(valuePtr interface{}, execFn hlpGetHandler) (casOut Cas, errOut error) {
func (b *Bucket) hlpGetExec(valuePtr interface{}, execFn hlpGetHandler) (casOut Cas, mtOut MutationToken, errOut error) {
signal := make(chan bool, 1)
op, err := execFn(func(bytes []byte, flags uint32, cas gocbcore.Cas, err error) {
errOut = err
Expand All @@ -23,7 +103,7 @@ func (b *Bucket) hlpGetExec(valuePtr interface{}, execFn hlpGetHandler) (casOut
signal <- true
})
if err != nil {
return 0, err
return 0, MutationToken{}, err
}

timeoutTmr := acquireTimer(b.opTimeout)
Expand All @@ -34,23 +114,24 @@ func (b *Bucket) hlpGetExec(valuePtr interface{}, execFn hlpGetHandler) (casOut
case <-timeoutTmr.C:
releaseTimer(timeoutTmr, true)
op.Cancel()
return 0, timeoutError{}
return 0, MutationToken{}, timeoutError{}
}
}

type hlpCasHandler func(ioCasCallback) (pendingOp, error)

func (b *Bucket) hlpCasExec(execFn hlpCasHandler) (casOut Cas, errOut error) {
func (b *Bucket) hlpCasExec(execFn hlpCasHandler) (casOut Cas, mtOut MutationToken, errOut error) {
signal := make(chan bool, 1)
op, err := execFn(func(cas gocbcore.Cas, err error) {
op, err := execFn(func(cas gocbcore.Cas, mt gocbcore.MutationToken, err error) {
errOut = err
if errOut == nil {
casOut = Cas(cas)
mtOut = MutationToken(mt)
}
signal <- true
})
if err != nil {
return 0, err
return 0, MutationToken{}, err
}

timeoutTmr := acquireTimer(b.opTimeout)
Expand All @@ -61,24 +142,25 @@ func (b *Bucket) hlpCasExec(execFn hlpCasHandler) (casOut Cas, errOut error) {
case <-timeoutTmr.C:
releaseTimer(timeoutTmr, true)
op.Cancel()
return 0, timeoutError{}
return 0, MutationToken{}, timeoutError{}
}
}

type hlpCtrHandler func(ioCtrCallback) (pendingOp, error)

func (b *Bucket) hlpCtrExec(execFn hlpCtrHandler) (valOut uint64, casOut Cas, errOut error) {
func (b *Bucket) hlpCtrExec(execFn hlpCtrHandler) (valOut uint64, casOut Cas, mtOut MutationToken, errOut error) {
signal := make(chan bool, 1)
op, err := execFn(func(value uint64, cas gocbcore.Cas, err error) {
op, err := execFn(func(value uint64, cas gocbcore.Cas, mt gocbcore.MutationToken, err error) {
errOut = err
if errOut == nil {
valOut = value
casOut = Cas(cas)
mtOut = MutationToken(mt)
}
signal <- true
})
if err != nil {
return 0, 0, err
return 0, 0, MutationToken{}, err
}

timeoutTmr := acquireTimer(b.opTimeout)
Expand All @@ -89,71 +171,63 @@ func (b *Bucket) hlpCtrExec(execFn hlpCtrHandler) (valOut uint64, casOut Cas, er
case <-timeoutTmr.C:
releaseTimer(timeoutTmr, true)
op.Cancel()
return 0, 0, timeoutError{}
return 0, 0, MutationToken{}, timeoutError{}
}
}

// Retrieves a document from the bucket
func (b *Bucket) Get(key string, valuePtr interface{}) (Cas, error) {
func (b *Bucket) get(key string, valuePtr interface{}) (Cas, MutationToken, error) {
return b.hlpGetExec(valuePtr, func(cb ioGetCallback) (pendingOp, error) {
op, err := b.client.Get([]byte(key), gocbcore.GetCallback(cb))
return op, err
})
}

// Retrieves a document and simultaneously updates its expiry time.
func (b *Bucket) GetAndTouch(key string, expiry uint32, valuePtr interface{}) (Cas, error) {
func (b *Bucket) getAndTouch(key string, expiry uint32, valuePtr interface{}) (Cas, MutationToken, error) {
return b.hlpGetExec(valuePtr, func(cb ioGetCallback) (pendingOp, error) {
op, err := b.client.GetAndTouch([]byte(key), expiry, gocbcore.GetCallback(cb))
return op, err
})
}

// Locks a document for a period of time, providing exclusive RW access to it.
func (b *Bucket) GetAndLock(key string, lockTime uint32, valuePtr interface{}) (Cas, error) {
func (b *Bucket) getAndLock(key string, lockTime uint32, valuePtr interface{}) (Cas, MutationToken, error) {
return b.hlpGetExec(valuePtr, func(cb ioGetCallback) (pendingOp, error) {
op, err := b.client.GetAndLock([]byte(key), lockTime, gocbcore.GetCallback(cb))
return op, err
})
}

// Unlocks a document which was locked with GetAndLock.
func (b *Bucket) Unlock(key string, cas Cas) (Cas, error) {
func (b *Bucket) unlock(key string, cas Cas) (Cas, MutationToken, error) {
return b.hlpCasExec(func(cb ioCasCallback) (pendingOp, error) {
op, err := b.client.Unlock([]byte(key), gocbcore.Cas(cas), gocbcore.UnlockCallback(cb))
return op, err
})
}

// Returns the value of a particular document from a replica server.
func (b *Bucket) GetReplica(key string, valuePtr interface{}, replicaIdx int) (Cas, error) {
func (b *Bucket) getReplica(key string, valuePtr interface{}, replicaIdx int) (Cas, MutationToken, error) {
return b.hlpGetExec(valuePtr, func(cb ioGetCallback) (pendingOp, error) {
op, err := b.client.GetReplica([]byte(key), replicaIdx, gocbcore.GetCallback(cb))
return op, err
})
}

// Touches a document, specifying a new expiry time for it.
func (b *Bucket) Touch(key string, cas Cas, expiry uint32) (Cas, error) {
func (b *Bucket) touch(key string, cas Cas, expiry uint32) (Cas, MutationToken, error) {
return b.hlpCasExec(func(cb ioCasCallback) (pendingOp, error) {
op, err := b.client.Touch([]byte(key), gocbcore.Cas(cas), expiry, gocbcore.TouchCallback(cb))
return op, err
})
}

// Removes a document from the bucket.
func (b *Bucket) Remove(key string, cas Cas) (Cas, error) {
func (b *Bucket) remove(key string, cas Cas) (Cas, MutationToken, error) {
return b.hlpCasExec(func(cb ioCasCallback) (pendingOp, error) {
op, err := b.client.Remove([]byte(key), gocbcore.Cas(cas), gocbcore.RemoveCallback(cb))
return op, err
})
}

// Inserts or replaces a document in the bucket.
func (b *Bucket) Upsert(key string, value interface{}, expiry uint32) (Cas, error) {
func (b *Bucket) upsert(key string, value interface{}, expiry uint32) (Cas, MutationToken, error) {
bytes, flags, err := b.transcoder.Encode(value)
if err != nil {
return 0, err
return 0, MutationToken{}, err
}

return b.hlpCasExec(func(cb ioCasCallback) (pendingOp, error) {
Expand All @@ -162,11 +236,10 @@ func (b *Bucket) Upsert(key string, value interface{}, expiry uint32) (Cas, erro
})
}

// Inserts a new document to the bucket.
func (b *Bucket) Insert(key string, value interface{}, expiry uint32) (Cas, error) {
func (b *Bucket) insert(key string, value interface{}, expiry uint32) (Cas, MutationToken, error) {
bytes, flags, err := b.transcoder.Encode(value)
if err != nil {
return 0, err
return 0, MutationToken{}, err
}

return b.hlpCasExec(func(cb ioCasCallback) (pendingOp, error) {
Expand All @@ -175,11 +248,10 @@ func (b *Bucket) Insert(key string, value interface{}, expiry uint32) (Cas, erro
})
}

// Replaces a document in the bucket.
func (b *Bucket) Replace(key string, value interface{}, cas Cas, expiry uint32) (Cas, error) {
func (b *Bucket) replace(key string, value interface{}, cas Cas, expiry uint32) (Cas, MutationToken, error) {
bytes, flags, err := b.transcoder.Encode(value)
if err != nil {
return 0, err
return 0, MutationToken{}, err
}

return b.hlpCasExec(func(cb ioCasCallback) (pendingOp, error) {
Expand All @@ -188,24 +260,21 @@ func (b *Bucket) Replace(key string, value interface{}, cas Cas, expiry uint32)
})
}

// Appends a string value to a document.
func (b *Bucket) Append(key, value string) (Cas, error) {
func (b *Bucket) append(key, value string) (Cas, MutationToken, error) {
return b.hlpCasExec(func(cb ioCasCallback) (pendingOp, error) {
op, err := b.client.Append([]byte(key), []byte(value), gocbcore.StoreCallback(cb))
return op, err
})
}

// Prepends a string value to a document.
func (b *Bucket) Prepend(key, value string) (Cas, error) {
func (b *Bucket) prepend(key, value string) (Cas, MutationToken, error) {
return b.hlpCasExec(func(cb ioCasCallback) (pendingOp, error) {
op, err := b.client.Prepend([]byte(key), []byte(value), gocbcore.StoreCallback(cb))
return op, err
})
}

// Performs an atomic addition or subtraction for an integer document.
func (b *Bucket) Counter(key string, delta, initial int64, expiry uint32) (uint64, Cas, error) {
func (b *Bucket) counter(key string, delta, initial int64, expiry uint32) (uint64, Cas, MutationToken, error) {
realInitial := uint64(0xFFFFFFFFFFFFFFFF)
if initial > 0 {
realInitial = uint64(initial)
Expand All @@ -222,6 +291,6 @@ func (b *Bucket) Counter(key string, delta, initial int64, expiry uint32) (uint6
return op, err
})
} else {
return 0, 0, clientError{"Delta must be a non-zero value."}
return 0, 0, MutationToken{}, clientError{"Delta must be a non-zero value."}
}
}
Loading

0 comments on commit 8b7b554

Please sign in to comment.