Skip to content

Commit 64076e7

Browse files
authored
Merge pull request #446 from GoRethink/develop
External custom marshalers
2 parents 9b148cc + b0a1d0a commit 64076e7

20 files changed

+573
-174
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,17 @@
22
All notable changes to this project will be documented in this file.
33
This project adheres to [Semantic Versioning](http://semver.org/).
44

5+
## v4.1.0 - 2018-08-29
6+
7+
### Fixed
8+
9+
- Rare `Connection` leaks if socket errors occurred
10+
- Updated `ql2.proto` file from rethinkdb repo
11+
12+
### Added
13+
14+
- Support for independent custom type marshalers
15+
516
## v4.0.0 - 2017-12-14
617

718
### Fixed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
![GoRethink Logo](https://raw.github.com/wiki/gorethink/gorethink/gopher-and-thinker-s.png "Golang Gopher and RethinkDB Thinker")
1111

12-
Current version: v4.0.0 (RethinkDB v2.3)
12+
Current version: v4.1.0 (RethinkDB v2.3)
1313

1414
<!-- This project is no longer maintained, for more information see the [v3.0.0 release](https://github.com/gorethink/gorethink/releases/tag/v3.0.0)-->
1515

@@ -40,7 +40,7 @@ import (
4040

4141
func Example() {
4242
session, err := r.Connect(r.ConnectOpts{
43-
Address: url,
43+
Address: url, // endpoint without http
4444
})
4545
if err != nil {
4646
log.Fatalln(err)
@@ -400,6 +400,8 @@ Sometimes the default behaviour for converting Go types to and from ReQL is not
400400

401401
An good example of how to use these interfaces is in the [`types`](https://github.com/gorethink/gorethink/blob/master/types/geometry.go#L84-L106) package, in this package the `Point` type is encoded as the `GEOMETRY` pseudo-type instead of a normal JSON object.
402402

403+
On the other side, you can implement external encode/decode functions with [`SetTypeEncoding`](https://godoc.org/github.com/gorethink/gorethink/encoding#SetTypeEncoding) function.
404+
403405
## Logging
404406

405407
By default the driver logs are disabled however when enabled the driver will log errors when it fails to connect to the database. If you would like more verbose error logging you can call `r.SetVerbose(true)`.

cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import (
77
"sync/atomic"
88
"time"
99

10-
"github.com/sirupsen/logrus"
1110
"github.com/cenkalti/backoff"
1211
"github.com/hailocab/go-hostpool"
12+
"github.com/sirupsen/logrus"
1313
"golang.org/x/net/context"
1414
)
1515

connection.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ import (
99
"sync/atomic"
1010
"time"
1111

12-
"golang.org/x/net/context"
13-
p "gopkg.in/gorethink/gorethink.v4/ql2"
14-
"sync"
12+
"bytes"
1513
"github.com/opentracing/opentracing-go"
1614
"github.com/opentracing/opentracing-go/ext"
1715
"github.com/opentracing/opentracing-go/log"
18-
"bytes"
16+
"golang.org/x/net/context"
17+
p "gopkg.in/gorethink/gorethink.v4/ql2"
18+
"sync"
1919
)
2020

2121
const (

connection_test.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package gorethink
22

33
import (
4-
test "gopkg.in/check.v1"
5-
p "gopkg.in/gorethink/gorethink.v4/ql2"
6-
"golang.org/x/net/context"
74
"encoding/binary"
85
"encoding/json"
6+
"github.com/opentracing/opentracing-go"
7+
"github.com/opentracing/opentracing-go/mocktracer"
8+
"golang.org/x/net/context"
9+
test "gopkg.in/check.v1"
10+
p "gopkg.in/gorethink/gorethink.v4/ql2"
911
"io"
1012
"time"
11-
"github.com/opentracing/opentracing-go/mocktracer"
12-
"github.com/opentracing/opentracing-go"
1313
)
1414

1515
type ConnectionSuite struct{}
@@ -52,7 +52,7 @@ func (s *ConnectionSuite) TestConnection_Query_Ok(c *test.C) {
5252
func (s *ConnectionSuite) TestConnection_Query_DefaultDBOk(c *test.C) {
5353
ctx := context.Background()
5454
token := int64(1)
55-
q := testQuery(Table("table").Get("id"),)
55+
q := testQuery(Table("table").Get("id"))
5656
q2 := q
5757
q2.Opts["db"], _ = DB("db").Build()
5858
writeData := serializeQuery(token, q2)
@@ -134,7 +134,7 @@ func (s *ConnectionSuite) TestConnection_Query_NoReplyOk(c *test.C) {
134134
connection := newConnection(conn, "addr", &ConnectOpts{})
135135
connection.runConnection()
136136
response, cursor, err := connection.Query(nil, q)
137-
time.Sleep(5*time.Millisecond)
137+
time.Sleep(5 * time.Millisecond)
138138
connection.Close()
139139

140140
c.Assert(response, test.IsNil)
@@ -247,9 +247,9 @@ func (s *ConnectionSuite) TestConnection_processResponses_SocketErr(c *test.C) {
247247
connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 1}, promise: promise1}
248248
connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 2}, promise: promise2}
249249
connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 2}, promise: promise3}
250-
time.Sleep(5*time.Millisecond)
250+
time.Sleep(5 * time.Millisecond)
251251
connection.responseChan <- responseAndError{err: io.EOF}
252-
time.Sleep(5*time.Millisecond)
252+
time.Sleep(5 * time.Millisecond)
253253

254254
select {
255255
case f := <-promise1:
@@ -284,9 +284,9 @@ func (s *ConnectionSuite) TestConnection_processResponses_StopOk(c *test.C) {
284284

285285
connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 1}, promise: promise1}
286286
close(connection.responseChan)
287-
time.Sleep(5*time.Millisecond)
287+
time.Sleep(5 * time.Millisecond)
288288
close(connection.stopReadChan)
289-
time.Sleep(5*time.Millisecond)
289+
time.Sleep(5 * time.Millisecond)
290290

291291
select {
292292
case f := <-promise1:
@@ -299,7 +299,7 @@ func (s *ConnectionSuite) TestConnection_processResponses_StopOk(c *test.C) {
299299

300300
func (s *ConnectionSuite) TestConnection_processResponses_ResponseFirst(c *test.C) {
301301
promise1 := make(chan responseAndCursor, 1)
302-
response1 := &Response{Token:1, Type: p.Response_RUNTIME_ERROR, ErrorType: p.Response_INTERNAL}
302+
response1 := &Response{Token: 1, Type: p.Response_RUNTIME_ERROR, ErrorType: p.Response_INTERNAL}
303303

304304
conn := &connMock{}
305305
conn.On("Close").Return(nil)
@@ -309,11 +309,11 @@ func (s *ConnectionSuite) TestConnection_processResponses_ResponseFirst(c *test.
309309
go connection.processResponses()
310310

311311
connection.responseChan <- responseAndError{response: response1}
312-
time.Sleep(5*time.Millisecond)
312+
time.Sleep(5 * time.Millisecond)
313313
connection.readRequestsChan <- tokenAndPromise{query: &Query{Token: 1}, promise: promise1}
314-
time.Sleep(5*time.Millisecond)
314+
time.Sleep(5 * time.Millisecond)
315315
connection.Close()
316-
time.Sleep(5*time.Millisecond)
316+
time.Sleep(5 * time.Millisecond)
317317

318318
select {
319319
case f := <-promise1:
@@ -437,8 +437,8 @@ func (s *ConnectionSuite) TestConnection_processResponse_FirstPartialOk(c *test.
437437
ctx := context.Background()
438438
token := int64(3)
439439
q := Query{Token: token}
440-
rawResponse1 := json.RawMessage{1,2,3}
441-
rawResponse2 := json.RawMessage{3,4,5}
440+
rawResponse1 := json.RawMessage{1, 2, 3}
441+
rawResponse2 := json.RawMessage{3, 4, 5}
442442
response := &Response{Token: token, Type: p.Response_SUCCESS_PARTIAL, Responses: []json.RawMessage{rawResponse1, rawResponse2}}
443443

444444
connection := newConnection(nil, "addr", &ConnectOpts{})
@@ -463,8 +463,8 @@ func (s *ConnectionSuite) TestConnection_processResponse_PartialOk(c *test.C) {
463463
token := int64(3)
464464
term := Table("test")
465465
q := Query{Token: token}
466-
rawResponse1 := json.RawMessage{1,2,3}
467-
rawResponse2 := json.RawMessage{3,4,5}
466+
rawResponse1 := json.RawMessage{1, 2, 3}
467+
rawResponse2 := json.RawMessage{3, 4, 5}
468468
response := &Response{Token: token, Type: p.Response_SUCCESS_PARTIAL, Responses: []json.RawMessage{rawResponse1, rawResponse2}}
469469

470470
connection := newConnection(nil, "addr", &ConnectOpts{})
@@ -491,8 +491,8 @@ func (s *ConnectionSuite) TestConnection_processResponse_SequenceOk(c *test.C) {
491491

492492
token := int64(3)
493493
q := Query{Token: token}
494-
rawResponse1 := json.RawMessage{1,2,3}
495-
rawResponse2 := json.RawMessage{3,4,5}
494+
rawResponse1 := json.RawMessage{1, 2, 3}
495+
rawResponse2 := json.RawMessage{3, 4, 5}
496496
response := &Response{Token: token, Type: p.Response_SUCCESS_SEQUENCE, Responses: []json.RawMessage{rawResponse1, rawResponse2}}
497497

498498
connection := newConnection(nil, "addr", &ConnectOpts{})

cursor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import (
77
"reflect"
88
"sync"
99

10+
"github.com/opentracing/opentracing-go"
1011
"golang.org/x/net/context"
1112
"gopkg.in/gorethink/gorethink.v4/encoding"
1213
p "gopkg.in/gorethink/gorethink.v4/ql2"
13-
"github.com/opentracing/opentracing-go"
1414
)
1515

1616
var (

encoding/decoder.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99

1010
var byteSliceType = reflect.TypeOf([]byte(nil))
1111

12-
type decoderFunc func(dv reflect.Value, sv reflect.Value)
12+
type decoderFunc func(dv reflect.Value, sv reflect.Value) error
1313

1414
// Decode decodes map[string]interface{} into a struct. The first parameter
1515
// must be a pointer.
@@ -54,18 +54,16 @@ func decode(dst interface{}, src interface{}, blank bool) (err error) {
5454
}
5555
}
5656

57-
decodeValue(dv, sv, blank)
58-
return nil
57+
return decodeValue(dv, sv, blank)
5958
}
6059

6160
// decodeValue decodes the source value into the destination value
62-
func decodeValue(dv, sv reflect.Value, blank bool) {
63-
valueDecoder(dv, sv, blank)(dv, sv)
61+
func decodeValue(dv, sv reflect.Value, blank bool) error {
62+
return valueDecoder(dv, sv, blank)(dv, sv)
6463
}
6564

6665
type decoderCacheKey struct {
6766
dt, st reflect.Type
68-
blank bool
6967
}
7068

7169
var decoderCache struct {
@@ -90,7 +88,7 @@ func valueDecoder(dv, sv reflect.Value, blank bool) decoderFunc {
9088

9189
func typeDecoder(dt, st reflect.Type, blank bool) decoderFunc {
9290
decoderCache.RLock()
93-
f := decoderCache.m[decoderCacheKey{dt, st, blank}]
91+
f := decoderCache.m[decoderCacheKey{dt, st}]
9492
decoderCache.RUnlock()
9593
if f != nil {
9694
return f
@@ -103,9 +101,9 @@ func typeDecoder(dt, st reflect.Type, blank bool) decoderFunc {
103101
decoderCache.Lock()
104102
var wg sync.WaitGroup
105103
wg.Add(1)
106-
decoderCache.m[decoderCacheKey{dt, st, blank}] = func(dv, sv reflect.Value) {
104+
decoderCache.m[decoderCacheKey{dt, st}] = func(dv, sv reflect.Value) error {
107105
wg.Wait()
108-
f(dv, sv)
106+
return f(dv, sv)
109107
}
110108
decoderCache.Unlock()
111109

@@ -114,7 +112,7 @@ func typeDecoder(dt, st reflect.Type, blank bool) decoderFunc {
114112
f = newTypeDecoder(dt, st, blank)
115113
wg.Done()
116114
decoderCache.Lock()
117-
decoderCache.m[decoderCacheKey{dt, st, blank}] = f
115+
decoderCache.m[decoderCacheKey{dt, st}] = f
118116
decoderCache.Unlock()
119117
return f
120118
}

0 commit comments

Comments
 (0)