Skip to content

External custom marshalers #446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).

## v4.1.0 - 2018-08-29

### Fixed

- Rare `Connection` leaks if socket errors occurred
- Updated `ql2.proto` file from rethinkdb repo

### Added

- Support for independent custom type marshalers

## v4.0.0 - 2017-12-14

### Fixed
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

![GoRethink Logo](https://raw.github.com/wiki/gorethink/gorethink/gopher-and-thinker-s.png "Golang Gopher and RethinkDB Thinker")

Current version: v4.0.0 (RethinkDB v2.3)
Current version: v4.1.0 (RethinkDB v2.3)

<!-- This project is no longer maintained, for more information see the [v3.0.0 release](https://github.com/gorethink/gorethink/releases/tag/v3.0.0)-->

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this commented code?


Expand Down Expand Up @@ -40,7 +40,7 @@ import (

func Example() {
session, err := r.Connect(r.ConnectOpts{
Address: url,
Address: url, // endpoint without http
})
if err != nil {
log.Fatalln(err)
Expand Down Expand Up @@ -400,6 +400,8 @@ Sometimes the default behaviour for converting Go types to and from ReQL is not

An good example of how to use these interfaces is in the [`types`](https://github.com/gorethink/gorethink/blob/master/types/geometry.go#L84-L106) package, in this package the `Point` type is encoded as the `GEOMETRY` pseudo-type instead of a normal JSON object.

On the other side, you can implement external encode/decode functions with [`SetTypeEncoding`](https://godoc.org/github.com/gorethink/gorethink/encoding#SetTypeEncoding) function.

## Logging

By default the driver logs are disabled however when enabled the driver will log errors when it fails to connect to the database. If you would like more verbose error logging you can call `r.SetVerbose(true)`.
Expand Down
2 changes: 1 addition & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"sync/atomic"
"time"

"github.com/sirupsen/logrus"
"github.com/cenkalti/backoff"
"github.com/hailocab/go-hostpool"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

Expand Down
8 changes: 4 additions & 4 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"sync/atomic"
"time"

"golang.org/x/net/context"
p "gopkg.in/gorethink/gorethink.v4/ql2"
"sync"
"bytes"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"bytes"
"golang.org/x/net/context"
p "gopkg.in/gorethink/gorethink.v4/ql2"
"sync"
)

const (
Expand Down
42 changes: 21 additions & 21 deletions connection_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package gorethink

import (
test "gopkg.in/check.v1"
p "gopkg.in/gorethink/gorethink.v4/ql2"
"golang.org/x/net/context"
"encoding/binary"
"encoding/json"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/mocktracer"
"golang.org/x/net/context"
test "gopkg.in/check.v1"
p "gopkg.in/gorethink/gorethink.v4/ql2"
"io"
"time"
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/opentracing/opentracing-go"
)

type ConnectionSuite struct{}
Expand Down Expand Up @@ -52,7 +52,7 @@ func (s *ConnectionSuite) TestConnection_Query_Ok(c *test.C) {
func (s *ConnectionSuite) TestConnection_Query_DefaultDBOk(c *test.C) {
ctx := context.Background()
token := int64(1)
q := testQuery(Table("table").Get("id"),)
q := testQuery(Table("table").Get("id"))
q2 := q
q2.Opts["db"], _ = DB("db").Build()
writeData := serializeQuery(token, q2)
Expand Down Expand Up @@ -134,7 +134,7 @@ func (s *ConnectionSuite) TestConnection_Query_NoReplyOk(c *test.C) {
connection := newConnection(conn, "addr", &ConnectOpts{})
connection.runConnection()
response, cursor, err := connection.Query(nil, q)
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)
connection.Close()

c.Assert(response, test.IsNil)
Expand Down Expand Up @@ -247,9 +247,9 @@ func (s *ConnectionSuite) TestConnection_processResponses_SocketErr(c *test.C) {
connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 1}, promise: promise1}
connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 2}, promise: promise2}
connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 2}, promise: promise3}
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)
connection.responseChan <- responseAndError{err: io.EOF}
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)

select {
case f := <-promise1:
Expand Down Expand Up @@ -284,9 +284,9 @@ func (s *ConnectionSuite) TestConnection_processResponses_StopOk(c *test.C) {

connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 1}, promise: promise1}
close(connection.responseChan)
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)
close(connection.stopReadChan)
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)

select {
case f := <-promise1:
Expand All @@ -299,7 +299,7 @@ func (s *ConnectionSuite) TestConnection_processResponses_StopOk(c *test.C) {

func (s *ConnectionSuite) TestConnection_processResponses_ResponseFirst(c *test.C) {
promise1 := make(chan responseAndCursor, 1)
response1 := &Response{Token:1, Type: p.Response_RUNTIME_ERROR, ErrorType: p.Response_INTERNAL}
response1 := &Response{Token: 1, Type: p.Response_RUNTIME_ERROR, ErrorType: p.Response_INTERNAL}

conn := &connMock{}
conn.On("Close").Return(nil)
Expand All @@ -309,11 +309,11 @@ func (s *ConnectionSuite) TestConnection_processResponses_ResponseFirst(c *test.
go connection.processResponses()

connection.responseChan <- responseAndError{response: response1}
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)
connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 1}, promise: promise1}
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)
connection.Close()
time.Sleep(5*time.Millisecond)
time.Sleep(5 * time.Millisecond)

select {
case f := <-promise1:
Expand Down Expand Up @@ -437,8 +437,8 @@ func (s *ConnectionSuite) TestConnection_processResponse_FirstPartialOk(c *test.
ctx := context.Background()
token := int64(3)
q := Query{Token: token}
rawResponse1 := json.RawMessage{1,2,3}
rawResponse2 := json.RawMessage{3,4,5}
rawResponse1 := json.RawMessage{1, 2, 3}
rawResponse2 := json.RawMessage{3, 4, 5}
response := &Response{Token: token, Type: p.Response_SUCCESS_PARTIAL, Responses: []json.RawMessage{rawResponse1, rawResponse2}}

connection := newConnection(nil, "addr", &ConnectOpts{})
Expand All @@ -463,8 +463,8 @@ func (s *ConnectionSuite) TestConnection_processResponse_PartialOk(c *test.C) {
token := int64(3)
term := Table("test")
q := Query{Token: token}
rawResponse1 := json.RawMessage{1,2,3}
rawResponse2 := json.RawMessage{3,4,5}
rawResponse1 := json.RawMessage{1, 2, 3}
rawResponse2 := json.RawMessage{3, 4, 5}
response := &Response{Token: token, Type: p.Response_SUCCESS_PARTIAL, Responses: []json.RawMessage{rawResponse1, rawResponse2}}

connection := newConnection(nil, "addr", &ConnectOpts{})
Expand All @@ -491,8 +491,8 @@ func (s *ConnectionSuite) TestConnection_processResponse_SequenceOk(c *test.C) {

token := int64(3)
q := Query{Token: token}
rawResponse1 := json.RawMessage{1,2,3}
rawResponse2 := json.RawMessage{3,4,5}
rawResponse1 := json.RawMessage{1, 2, 3}
rawResponse2 := json.RawMessage{3, 4, 5}
response := &Response{Token: token, Type: p.Response_SUCCESS_SEQUENCE, Responses: []json.RawMessage{rawResponse1, rawResponse2}}

connection := newConnection(nil, "addr", &ConnectOpts{})
Expand Down
2 changes: 1 addition & 1 deletion cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"reflect"
"sync"

"github.com/opentracing/opentracing-go"
"golang.org/x/net/context"
"gopkg.in/gorethink/gorethink.v4/encoding"
p "gopkg.in/gorethink/gorethink.v4/ql2"
"github.com/opentracing/opentracing-go"
)

var (
Expand Down
18 changes: 8 additions & 10 deletions encoding/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

var byteSliceType = reflect.TypeOf([]byte(nil))

type decoderFunc func(dv reflect.Value, sv reflect.Value)
type decoderFunc func(dv reflect.Value, sv reflect.Value) error

// Decode decodes map[string]interface{} into a struct. The first parameter
// must be a pointer.
Expand Down Expand Up @@ -54,18 +54,16 @@ func decode(dst interface{}, src interface{}, blank bool) (err error) {
}
}

decodeValue(dv, sv, blank)
return nil
return decodeValue(dv, sv, blank)
}

// decodeValue decodes the source value into the destination value
func decodeValue(dv, sv reflect.Value, blank bool) {
valueDecoder(dv, sv, blank)(dv, sv)
func decodeValue(dv, sv reflect.Value, blank bool) error {
return valueDecoder(dv, sv, blank)(dv, sv)
}

type decoderCacheKey struct {
dt, st reflect.Type
blank bool
}

var decoderCache struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK to use global anonymous struct here?

Expand All @@ -90,7 +88,7 @@ func valueDecoder(dv, sv reflect.Value, blank bool) decoderFunc {

func typeDecoder(dt, st reflect.Type, blank bool) decoderFunc {
decoderCache.RLock()
f := decoderCache.m[decoderCacheKey{dt, st, blank}]
f := decoderCache.m[decoderCacheKey{dt, st}]
decoderCache.RUnlock()
if f != nil {
return f
Expand All @@ -103,9 +101,9 @@ func typeDecoder(dt, st reflect.Type, blank bool) decoderFunc {
decoderCache.Lock()
var wg sync.WaitGroup
wg.Add(1)
decoderCache.m[decoderCacheKey{dt, st, blank}] = func(dv, sv reflect.Value) {
decoderCache.m[decoderCacheKey{dt, st}] = func(dv, sv reflect.Value) error {
wg.Wait()
f(dv, sv)
return f(dv, sv)
}
decoderCache.Unlock()

Expand All @@ -114,7 +112,7 @@ func typeDecoder(dt, st reflect.Type, blank bool) decoderFunc {
f = newTypeDecoder(dt, st, blank)
wg.Done()
decoderCache.Lock()
decoderCache.m[decoderCacheKey{dt, st, blank}] = f
decoderCache.m[decoderCacheKey{dt, st}] = f
decoderCache.Unlock()
return f
}
Expand Down
Loading