Skip to content

Commit

Permalink
GOCBC-18: Support custom transcoders.
Browse files Browse the repository at this point in the history
  • Loading branch information
brett19 committed Feb 25, 2015
1 parent 5d293e3 commit f16edf9
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 39 deletions.
34 changes: 19 additions & 15 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@ import (

// An interface representing a single bucket within a cluster.
type Bucket struct {
name string
password string
httpCli *http.Client
client *gocbcore.Agent
name string
password string
httpCli *http.Client
client *gocbcore.Agent
transcoder Transcoder
}

func (b *Bucket) SetTranscoder(transcoder Transcoder) {
b.transcoder = transcoder
}

func (b *Bucket) afterOpTimeout() <-chan time.Time {
Expand All @@ -29,18 +34,17 @@ type ioCtrCallback func(uint64, uint64, error)

type hlpGetHandler func(ioGetCallback) (pendingOp, error)

func (b *Bucket) hlpGetExec(valuePtr interface{}, execFn hlpGetHandler) (valOut interface{}, casOut uint64, errOut error) {
func (b *Bucket) hlpGetExec(valuePtr interface{}, execFn hlpGetHandler) (casOut uint64, errOut error) {
signal := make(chan bool, 1)
op, err := execFn(func(bytes []byte, flags uint32, cas uint64, err error) {
go func() {
if err != nil {
errOut = err
} else {
value, err := b.decodeValue(bytes, flags, valuePtr)
err = b.transcoder.Decode(bytes, flags, valuePtr)
if err != nil {
errOut = err
} else {
valOut = value
casOut = cas
}

Expand All @@ -49,15 +53,15 @@ func (b *Bucket) hlpGetExec(valuePtr interface{}, execFn hlpGetHandler) (valOut
}()
})
if err != nil {
return nil, 0, err
return 0, err
}

select {
case <-signal:
return
case <-b.afterOpTimeout():
op.Cancel()
return nil, 0, timeoutError{}
return 0, timeoutError{}
}
}

Expand Down Expand Up @@ -117,23 +121,23 @@ func (b *Bucket) hlpCtrExec(execFn hlpCtrHandler) (valOut uint64, casOut uint64,
}

// Retrieves a document from the bucket
func (b *Bucket) Get(key string, valuePtr interface{}) (interface{}, uint64, error) {
func (b *Bucket) Get(key string, valuePtr interface{}) (uint64, error) {
return b.hlpGetExec(valuePtr, func(cb ioGetCallback) (pendingOp, error) {
op, err := b.client.Get([]byte(key), gocbcore.GetCallback(cb))
return op, err
})
}

// Retrieves a document and simultaneously updates its expiry time.
func (b *Bucket) GetAndTouch(key string, expiry uint32, valuePtr interface{}) (interface{}, uint64, error) {
func (b *Bucket) GetAndTouch(key string, expiry uint32, valuePtr interface{}) (uint64, error) {
return b.hlpGetExec(valuePtr, func(cb ioGetCallback) (pendingOp, error) {
op, err := b.client.GetAndTouch([]byte(key), expiry, gocbcore.GetCallback(cb))
return op, err
})
}

// Locks a document for a period of time, providing exclusive RW access to it.
func (b *Bucket) GetAndLock(key string, lockTime uint32, valuePtr interface{}) (interface{}, uint64, error) {
func (b *Bucket) GetAndLock(key string, lockTime uint32, valuePtr interface{}) (uint64, error) {
return b.hlpGetExec(valuePtr, func(cb ioGetCallback) (pendingOp, error) {
op, err := b.client.GetAndLock([]byte(key), lockTime, gocbcore.GetCallback(cb))
return op, err
Expand Down Expand Up @@ -171,7 +175,7 @@ func (b *Bucket) Remove(key string, cas uint64) (casOut uint64, errOut error) {

// Inserts or replaces a document in the bucket.
func (b *Bucket) Upsert(key string, value interface{}, expiry uint32) (casOut uint64, errOut error) {
bytes, flags, err := b.encodeValue(value)
bytes, flags, err := b.transcoder.Encode(value)
if err != nil {
return 0, err
}
Expand All @@ -184,7 +188,7 @@ func (b *Bucket) Upsert(key string, value interface{}, expiry uint32) (casOut ui

// Inserts a new document to the bucket.
func (b *Bucket) Insert(key string, value interface{}, expiry uint32) (uint64, error) {
bytes, flags, err := b.encodeValue(value)
bytes, flags, err := b.transcoder.Encode(value)
if err != nil {
return 0, err
}
Expand All @@ -197,7 +201,7 @@ func (b *Bucket) Insert(key string, value interface{}, expiry uint32) (uint64, e

// Replaces a document in the bucket.
func (b *Bucket) Replace(key string, value interface{}, cas uint64, expiry uint32) (uint64, error) {
bytes, flags, err := b.encodeValue(value)
bytes, flags, err := b.transcoder.Encode(value)
if err != nil {
return 0, err
}
Expand Down
1 change: 1 addition & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (c *Cluster) OpenBucket(bucket, password string) (*Bucket, error) {
},
},
},
transcoder: &DefaultTranscoder{},
}, nil
}

Expand Down
44 changes: 20 additions & 24 deletions transcoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,50 @@ import (
"encoding/json"
)

func (b *Bucket) decodeValue(bytes []byte, flags uint32, out interface{}) (interface{}, error) {
type Transcoder interface {
Decode([]byte, uint32, interface{}) error
Encode(interface{}) ([]byte, uint32, error)
}

type DefaultTranscoder struct {
}

func (t DefaultTranscoder) Decode(bytes []byte, flags uint32, out interface{}) error {
// Check for legacy flags
if flags&cfMask == 0 {
// Legacy Flags
if flags == lfJson {
// Legacy JSON
flags = cfFmtJson
} else {
return nil, clientError{"Unexpected legacy flags value"}
return clientError{"Unexpected legacy flags value"}
}
}

// Make sure compression is disabled
if flags&cfCmprMask != cfCmprNone {
return nil, clientError{"Unexpected value compression"}
}

// If an output object was passed, try to json Unmarshal to it
if out != nil {
if flags&cfFmtJson != 0 {
err := json.Unmarshal(bytes, out)
if err != nil {
return nil, clientError{err.Error()}
}
return out, nil
} else {
return nil, clientError{"Unmarshal target passed, but type does not match."}
}
return clientError{"Unexpected value compression"}
}

// Normal types of decoding
if flags&cfFmtMask == cfFmtBinary {
return bytes, nil
*(out.(*[]byte)) = bytes
return nil
} else if flags&cfFmtMask == cfFmtString {
return string(bytes[0:]), nil
*(out.(*string)) = string(bytes)
return nil
} else if flags&cfFmtMask == cfFmtJson {
var outVal interface{}
err := json.Unmarshal(bytes, &outVal)
err := json.Unmarshal(bytes, &out)
if err != nil {
return nil, clientError{err.Error()}
return err
}
return outVal, nil
return nil
} else {
return nil, clientError{"Unexpected flags value"}
return clientError{"Unexpected flags value"}
}
}

func (b *Bucket) encodeValue(value interface{}) ([]byte, uint32, error) {
func (t DefaultTranscoder) Encode(value interface{}) ([]byte, uint32, error) {
var bytes []byte
var flags uint32
var err error
Expand Down

0 comments on commit f16edf9

Please sign in to comment.