diff --git a/bucket.go b/bucket.go index 15e92070..a9cebdc1 100644 --- a/bucket.go +++ b/bucket.go @@ -11,10 +11,15 @@ import ( // An interface representing a single bucket within a cluster. type Bucket struct { - name string - password string - httpCli *http.Client - client *gocbcore.Agent + name string + password string + httpCli *http.Client + client *gocbcore.Agent + transcoder Transcoder +} + +func (b *Bucket) SetTranscoder(transcoder Transcoder) { + b.transcoder = transcoder } func (b *Bucket) afterOpTimeout() <-chan time.Time { @@ -29,18 +34,17 @@ type ioCtrCallback func(uint64, uint64, error) type hlpGetHandler func(ioGetCallback) (pendingOp, error) -func (b *Bucket) hlpGetExec(valuePtr interface{}, execFn hlpGetHandler) (valOut interface{}, casOut uint64, errOut error) { +func (b *Bucket) hlpGetExec(valuePtr interface{}, execFn hlpGetHandler) (casOut uint64, errOut error) { signal := make(chan bool, 1) op, err := execFn(func(bytes []byte, flags uint32, cas uint64, err error) { go func() { if err != nil { errOut = err } else { - value, err := b.decodeValue(bytes, flags, valuePtr) + err = b.transcoder.Decode(bytes, flags, valuePtr) if err != nil { errOut = err } else { - valOut = value casOut = cas } @@ -49,7 +53,7 @@ func (b *Bucket) hlpGetExec(valuePtr interface{}, execFn hlpGetHandler) (valOut }() }) if err != nil { - return nil, 0, err + return 0, err } select { @@ -57,7 +61,7 @@ func (b *Bucket) hlpGetExec(valuePtr interface{}, execFn hlpGetHandler) (valOut return case <-b.afterOpTimeout(): op.Cancel() - return nil, 0, timeoutError{} + return 0, timeoutError{} } } @@ -117,7 +121,7 @@ func (b *Bucket) hlpCtrExec(execFn hlpCtrHandler) (valOut uint64, casOut uint64, } // Retrieves a document from the bucket -func (b *Bucket) Get(key string, valuePtr interface{}) (interface{}, uint64, error) { +func (b *Bucket) Get(key string, valuePtr interface{}) (uint64, error) { return b.hlpGetExec(valuePtr, func(cb ioGetCallback) (pendingOp, error) { op, err := b.client.Get([]byte(key), gocbcore.GetCallback(cb)) return op, err @@ -125,7 +129,7 @@ func (b *Bucket) Get(key string, valuePtr interface{}) (interface{}, uint64, err } // Retrieves a document and simultaneously updates its expiry time. -func (b *Bucket) GetAndTouch(key string, expiry uint32, valuePtr interface{}) (interface{}, uint64, error) { +func (b *Bucket) GetAndTouch(key string, expiry uint32, valuePtr interface{}) (uint64, error) { return b.hlpGetExec(valuePtr, func(cb ioGetCallback) (pendingOp, error) { op, err := b.client.GetAndTouch([]byte(key), expiry, gocbcore.GetCallback(cb)) return op, err @@ -133,7 +137,7 @@ func (b *Bucket) GetAndTouch(key string, expiry uint32, valuePtr interface{}) (i } // Locks a document for a period of time, providing exclusive RW access to it. -func (b *Bucket) GetAndLock(key string, lockTime uint32, valuePtr interface{}) (interface{}, uint64, error) { +func (b *Bucket) GetAndLock(key string, lockTime uint32, valuePtr interface{}) (uint64, error) { return b.hlpGetExec(valuePtr, func(cb ioGetCallback) (pendingOp, error) { op, err := b.client.GetAndLock([]byte(key), lockTime, gocbcore.GetCallback(cb)) return op, err @@ -171,7 +175,7 @@ func (b *Bucket) Remove(key string, cas uint64) (casOut uint64, errOut error) { // Inserts or replaces a document in the bucket. func (b *Bucket) Upsert(key string, value interface{}, expiry uint32) (casOut uint64, errOut error) { - bytes, flags, err := b.encodeValue(value) + bytes, flags, err := b.transcoder.Encode(value) if err != nil { return 0, err } @@ -184,7 +188,7 @@ func (b *Bucket) Upsert(key string, value interface{}, expiry uint32) (casOut ui // Inserts a new document to the bucket. func (b *Bucket) Insert(key string, value interface{}, expiry uint32) (uint64, error) { - bytes, flags, err := b.encodeValue(value) + bytes, flags, err := b.transcoder.Encode(value) if err != nil { return 0, err } @@ -197,7 +201,7 @@ func (b *Bucket) Insert(key string, value interface{}, expiry uint32) (uint64, e // Replaces a document in the bucket. func (b *Bucket) Replace(key string, value interface{}, cas uint64, expiry uint32) (uint64, error) { - bytes, flags, err := b.encodeValue(value) + bytes, flags, err := b.transcoder.Encode(value) if err != nil { return 0, err } diff --git a/cluster.go b/cluster.go index 17346cd0..bcd638fe 100644 --- a/cluster.go +++ b/cluster.go @@ -94,6 +94,7 @@ func (c *Cluster) OpenBucket(bucket, password string) (*Bucket, error) { }, }, }, + transcoder: &DefaultTranscoder{}, }, nil } diff --git a/transcoding.go b/transcoding.go index 639225a1..ed89a894 100644 --- a/transcoding.go +++ b/transcoding.go @@ -4,7 +4,15 @@ import ( "encoding/json" ) -func (b *Bucket) decodeValue(bytes []byte, flags uint32, out interface{}) (interface{}, error) { +type Transcoder interface { + Decode([]byte, uint32, interface{}) error + Encode(interface{}) ([]byte, uint32, error) +} + +type DefaultTranscoder struct { +} + +func (t DefaultTranscoder) Decode(bytes []byte, flags uint32, out interface{}) error { // Check for legacy flags if flags&cfMask == 0 { // Legacy Flags @@ -12,46 +20,34 @@ func (b *Bucket) decodeValue(bytes []byte, flags uint32, out interface{}) (inter // Legacy JSON flags = cfFmtJson } else { - return nil, clientError{"Unexpected legacy flags value"} + return clientError{"Unexpected legacy flags value"} } } // Make sure compression is disabled if flags&cfCmprMask != cfCmprNone { - return nil, clientError{"Unexpected value compression"} - } - - // If an output object was passed, try to json Unmarshal to it - if out != nil { - if flags&cfFmtJson != 0 { - err := json.Unmarshal(bytes, out) - if err != nil { - return nil, clientError{err.Error()} - } - return out, nil - } else { - return nil, clientError{"Unmarshal target passed, but type does not match."} - } + return clientError{"Unexpected value compression"} } // Normal types of decoding if flags&cfFmtMask == cfFmtBinary { - return bytes, nil + *(out.(*[]byte)) = bytes + return nil } else if flags&cfFmtMask == cfFmtString { - return string(bytes[0:]), nil + *(out.(*string)) = string(bytes) + return nil } else if flags&cfFmtMask == cfFmtJson { - var outVal interface{} - err := json.Unmarshal(bytes, &outVal) + err := json.Unmarshal(bytes, &out) if err != nil { - return nil, clientError{err.Error()} + return err } - return outVal, nil + return nil } else { - return nil, clientError{"Unexpected flags value"} + return clientError{"Unexpected flags value"} } } -func (b *Bucket) encodeValue(value interface{}) ([]byte, uint32, error) { +func (t DefaultTranscoder) Encode(value interface{}) ([]byte, uint32, error) { var bytes []byte var flags uint32 var err error