Skip to content

Commit e37fa1b

Browse files
committed
api: updated response/future methods
Added Future and Response's interface method Release. Connection now contains sync.Pool that was used to reduce allocations. Updated Future's Get methods. Fixes #493
1 parent 713316b commit e37fa1b

File tree

11 files changed

+180
-33
lines changed

11 files changed

+180
-33
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1212

1313
* New types for MessagePack extensions compatible with go-option (#459).
1414
* Added `box.MustNew` wrapper for `box.New` without an error (#448).
15+
* Method `Release` for `Future` and `Response` interface:
16+
`Get` and `GetTyped` are calling `Release` by defer.
17+
`GetResult` and `GetTypedResult` just return result without `Release` (#493).
1518

1619
### Changed
1720

connection.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,12 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
175175
// More on graceful shutdown:
176176
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
177177
type Connection struct {
178-
addr net.Addr
179-
dialer Dialer
180-
c Conn
181-
mutex sync.Mutex
182-
cond *sync.Cond
178+
addr net.Addr
179+
dialer Dialer
180+
c Conn
181+
mutex sync.Mutex
182+
cond *sync.Cond
183+
slicePool *sync.Pool
183184
// schemaResolver contains a SchemaResolver implementation.
184185
schemaResolver SchemaResolver
185186
// requestId contains the last request ID for requests with nil context.
@@ -373,7 +374,12 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
373374
}
374375

375376
conn.cond = sync.NewCond(&conn.mutex)
376-
377+
conn.slicePool = &sync.Pool{
378+
New: func() any {
379+
buf := make([]byte, 0, 4096)
380+
return &buf
381+
},
382+
}
377383
if conn.opts.Reconnect > 0 {
378384
// We don't need these mutex.Lock()/mutex.Unlock() here, but
379385
// runReconnects() expects mutex.Lock() to be set, so it's
@@ -848,8 +854,9 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
848854

849855
go conn.eventer(events)
850856

857+
buf := smallBuf{}
851858
for atomic.LoadUint32(&conn.state) != connClosed {
852-
respBytes, err := read(r, conn.lenbuf[:])
859+
respBytes, err := read(r, conn.lenbuf[:], conn)
853860
if err != nil {
854861
err = ClientError{
855862
ErrIoError,
@@ -858,7 +865,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
858865
conn.reconnect(err, c)
859866
return
860867
}
861-
buf := smallBuf{b: respBytes}
868+
buf.b, buf.p = respBytes, 0
862869
header, code, err := decodeHeader(conn.dec, &buf)
863870
if err != nil {
864871
err = ClientError{
@@ -925,7 +932,7 @@ func (conn *Connection) eventer(events <-chan connWatchEvent) {
925932

926933
func (conn *Connection) newFuture(req Request) (fut *Future) {
927934
ctx := req.Ctx()
928-
fut = NewFuture(req)
935+
fut = NewFuture(req, conn)
929936
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
930937
select {
931938
case conn.rlimit <- struct{}{}:
@@ -1187,7 +1194,7 @@ func (conn *Connection) timeouts() {
11871194
}
11881195
}
11891196

1190-
func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
1197+
func read(r io.Reader, lenbuf []byte, conn ...*Connection) (response []byte, err error) {
11911198
var length uint64
11921199

11931200
if _, err = io.ReadFull(r, lenbuf); err != nil {
@@ -1211,7 +1218,15 @@ func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
12111218
return
12121219
}
12131220

1214-
response = make([]byte, length)
1221+
if len(conn) == 0 {
1222+
response = make([]byte, length)
1223+
} else {
1224+
response = *conn[0].slicePool.Get().(*[]byte)
1225+
if cap(response) < int(length) {
1226+
response = make([]byte, length)
1227+
}
1228+
response = response[:length]
1229+
}
12151230
_, err = io.ReadFull(r, response)
12161231

12171232
return
@@ -1232,7 +1247,7 @@ func (conn *Connection) nextRequestId(context bool) (requestId uint32) {
12321247
func (conn *Connection) Do(req Request) *Future {
12331248
if connectedReq, ok := req.(ConnectedRequest); ok {
12341249
if connectedReq.Conn() != conn {
1235-
fut := NewFuture(req)
1250+
fut := NewFuture(req, conn)
12361251
fut.SetError(errUnknownRequest)
12371252
return fut
12381253
}

example_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1291,6 +1291,7 @@ func ExampleConnection_Do_failure() {
12911291

12921292
// We got a future, the request actually not performed yet.
12931293
future := conn.Do(req)
1294+
defer future.Release()
12941295

12951296
// When the future receives the response, the result of the Future is set
12961297
// and becomes available. We could wait for that moment with Future.Get(),
@@ -1305,7 +1306,7 @@ func ExampleConnection_Do_failure() {
13051306
fmt.Printf("Response error: %s\n", resp.Header().Error)
13061307
}
13071308

1308-
data, err := future.Get()
1309+
data, err := future.GetResult()
13091310
if err != nil {
13101311
fmt.Printf("Data: %v\n", data)
13111312
}

future.go

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type Future struct {
1212
req Request
1313
next *Future
1414
timeout time.Duration
15+
pool *sync.Pool
1516
mutex sync.Mutex
1617
resp Response
1718
err error
@@ -38,10 +39,15 @@ func (fut *Future) isDone() bool {
3839
}
3940

4041
// NewFuture creates a new empty Future for a given Request.
41-
func NewFuture(req Request) (fut *Future) {
42+
func NewFuture(req Request, conn ...*Connection) (fut *Future) {
4243
fut = &Future{}
4344
fut.done = make(chan struct{})
4445
fut.req = req
46+
if len(conn) == 0 {
47+
fut.pool = nil
48+
} else {
49+
fut.pool = conn[0].slicePool
50+
}
4551
return fut
4652
}
4753

@@ -89,12 +95,28 @@ func (fut *Future) GetResponse() (Response, error) {
8995
}
9096

9197
// Get waits for Future to be filled and returns the data of the Response and error.
98+
// Also Release Future's data. After this, Future becomes invalid.
9299
//
93100
// The data will be []interface{}, so if you want more performance, use GetTyped method.
94101
//
95102
// "error" could be Error, if it is error returned by Tarantool,
96103
// or ClientError, if something bad happens in a client process.
97104
func (fut *Future) Get() ([]interface{}, error) {
105+
defer fut.Release()
106+
fut.wait()
107+
if fut.err != nil {
108+
return nil, fut.err
109+
}
110+
return fut.resp.Decode()
111+
}
112+
113+
// Get waits for Future to be filled and returns the data of the Response and error.
114+
//
115+
// The data will be []interface{}, so if you want more performance, use GetTyped method.
116+
//
117+
// "error" could be Error, if it is error returned by Tarantool,
118+
// or ClientError, if something bad happens in a client process.
119+
func (fut *Future) GetResult() ([]interface{}, error) {
98120
fut.wait()
99121
if fut.err != nil {
100122
return nil, fut.err
@@ -105,8 +127,23 @@ func (fut *Future) Get() ([]interface{}, error) {
105127
// GetTyped waits for Future and calls msgpack.Decoder.Decode(result) if no error happens.
106128
// It is could be much faster than Get() function.
107129
//
130+
// Also Release Future's data. After this, Future becomes invalid.
131+
//
108132
// Note: Tarantool usually returns array of tuples (except for Eval and Call17 actions).
109133
func (fut *Future) GetTyped(result interface{}) error {
134+
defer fut.Release()
135+
fut.wait()
136+
if fut.err != nil {
137+
return fut.err
138+
}
139+
return fut.resp.DecodeTyped(result)
140+
}
141+
142+
// GetTyped waits for Future and calls msgpack.Decoder.Decode(result) if no error happens.
143+
// It is could be much faster than Get() function.
144+
//
145+
// Note: Tarantool usually returns array of tuples (except for Eval and Call17 actions).
146+
func (fut *Future) GetTypedResult(result interface{}) error {
110147
fut.wait()
111148
if fut.err != nil {
112149
return fut.err
@@ -127,3 +164,12 @@ func (fut *Future) WaitChan() <-chan struct{} {
127164
}
128165
return fut.done
129166
}
167+
168+
// Release is freeing the Future resources.
169+
// After this, using this Future becomes invalid.
170+
func (fut *Future) Release() {
171+
if fut.pool != nil && fut.resp != nil {
172+
ptr := fut.resp.Release()
173+
fut.pool.Put(ptr)
174+
}
175+
}

future_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ func (resp *futureMockResponse) Header() Header {
5353
return resp.header
5454
}
5555

56+
func (resp *futureMockResponse) Release() *[]byte {
57+
// Releasing futureMockResponse data.
58+
return &resp.data
59+
}
60+
5661
func (resp *futureMockResponse) Decode() ([]interface{}, error) {
5762
resp.decodeCnt++
5863

pool/connection_pool_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1744,12 +1744,12 @@ func TestDoWithExecuteRequest(t *testing.T) {
17441744
mem := []Member{}
17451745

17461746
fut := connPool.Do(tarantool.NewExecuteRequest(request).Args([]interface{}{}), pool.ANY)
1747-
data, err := fut.Get()
1747+
data, err := fut.GetResult()
17481748
require.Nilf(t, err, "failed to Do with ExecuteRequest")
17491749
require.NotNilf(t, data, "response is nil after Execute")
17501750
require.GreaterOrEqualf(t, len(data), 1, "response.Data is empty after Do with ExecuteRequest")
17511751
require.Equalf(t, len(data[0].([]interface{})), 2, "unexpected response")
1752-
err = fut.GetTyped(&mem)
1752+
err = fut.GetTypedResult(&mem)
17531753
require.Nilf(t, err, "Unable to GetTyped of fut")
17541754
require.Equalf(t, len(mem), 1, "wrong count of result")
17551755
}

prepared.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (req *PrepareRequest) Response(header Header, body io.Reader) (Response, er
9090
if err != nil {
9191
return nil, err
9292
}
93-
return &PrepareResponse{baseResponse: baseResp}, nil
93+
return &PrepareResponse{baseResponse: *baseResp}, nil
9494
}
9595

9696
// UnprepareRequest helps you to create an unprepare request object for
@@ -204,5 +204,5 @@ func (req *ExecutePreparedRequest) Response(header Header, body io.Reader) (Resp
204204
if err != nil {
205205
return nil, err
206206
}
207-
return &ExecuteResponse{baseResponse: baseResp}, nil
207+
return &ExecuteResponse{baseResponse: *baseResp}, nil
208208
}

request.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,11 +1031,11 @@ func (req *SelectRequest) Context(ctx context.Context) *SelectRequest {
10311031

10321032
// Response creates a response for the SelectRequest.
10331033
func (req *SelectRequest) Response(header Header, body io.Reader) (Response, error) {
1034-
baseResp, err := createBaseResponse(header, body)
1034+
SelectResp, err := createSelectResponse(header, body)
10351035
if err != nil {
10361036
return nil, err
10371037
}
1038-
return &SelectResponse{baseResponse: baseResp}, nil
1038+
return SelectResp, nil
10391039
}
10401040

10411041
// InsertRequest helps you to create an insert request object for execution
@@ -1565,7 +1565,7 @@ func (req *ExecuteRequest) Response(header Header, body io.Reader) (Response, er
15651565
if err != nil {
15661566
return nil, err
15671567
}
1568-
return &ExecuteResponse{baseResponse: baseResp}, nil
1568+
return &ExecuteResponse{baseResponse: *baseResp}, nil
15691569
}
15701570

15711571
// WatchOnceRequest synchronously fetches the value currently associated with a

0 commit comments

Comments
 (0)