Skip to content

Commit

Permalink
api: add support of a batch insert request
Browse files Browse the repository at this point in the history
Add support the IPROTO_INSERT_ARROW request and
message pack type MP_ARROW.

Closes #399
  • Loading branch information
dmyger committed Oct 10, 2024
1 parent d1f60e0 commit 7d49fd0
Show file tree
Hide file tree
Showing 14 changed files with 802 additions and 20 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
## [Unreleased]

### Added
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
connection and ctx is not canceled;
also added logs for error case of `ConnectionPool.tryConnect()` calls in
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
connection and ctx is not canceled;
also added logs for error case of `ConnectionPool.tryConnect()` calls in
`ConnectionPool.controller()` and `ConnectionPool.reconnect()`
- Methods that are implemented but not included in the pooler interface (#395).
- Implemented stringer methods for pool.Role (#405).
- Support the IPROTO_INSERT_ARROW request (#399).

### Changed

Expand Down
56 changes: 56 additions & 0 deletions arrow/arrow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package arrow

import (
"fmt"
"reflect"

"github.com/vmihailenco/msgpack/v5"
)

// Arrow MessagePack extension type.
const arrowExtId = 8

// Arrow struct wraps a raw arrow data buffer.
type Arrow struct {
data []byte
}

// MakeArrow returns a new arrow.Arrow object that contains
// wrapped a raw arrow data buffer.
func MakeArrow(arrow []byte) (Arrow, error) {
return Arrow{arrow}, nil
}

// Raw returns a []byte that contains Arrow raw data.
func (a Arrow) Raw() []byte {
return a.data
}

func arrowDecoder(d *msgpack.Decoder, v reflect.Value, extLen int) error {
arrow := Arrow{
data: make([]byte, extLen),
}
n, err := d.Buffered().Read(arrow.data)
if err != nil {
return fmt.Errorf("arrowDecoder: can't read bytes on Arrow decode: %w", err)
}
if n < extLen || n != len(arrow.data) {
return fmt.Errorf("arrowDecoder: unexpected end of stream after %d Arrow bytes", n)
}

v.Set(reflect.ValueOf(arrow))
return nil
}

func arrowEncoder(e *msgpack.Encoder, v reflect.Value) ([]byte, error) {
arr, ok := v.Interface().(Arrow)
if !ok {
return []byte{}, fmt.Errorf("arrowEncoder: not an Arrow type")
}
return arr.data, nil
}

func init() {
msgpack.RegisterExtDecoder(arrowExtId, Arrow{}, arrowDecoder)
msgpack.RegisterExtEncoder(arrowExtId, Arrow{}, arrowEncoder)
}
100 changes: 100 additions & 0 deletions arrow/arrow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package arrow_test

import (
"bytes"
"encoding/hex"
"testing"

"github.com/stretchr/testify/require"
"github.com/tarantool/go-tarantool/v2/arrow"
"github.com/vmihailenco/msgpack/v5"
)

var longArrow, _ = hex.DecodeString("ffffffff70000000040000009effffff0400010004000000" +
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
"01000000000000003400000008000000000000000200000000000000000000000000" +
"00000000000000000000000000000800000000000000000000000100000001000000" +
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
"00000000000000000000")

var tests = []struct {
name string
arr []byte
enc []byte
}{
{
"abc",
[]byte{'a', 'b', 'c'},
[]byte{0xc7, 0x3, 0x8, 'a', 'b', 'c'},
},
{
"empty",
[]byte{},
[]byte{0xc7, 0x0, 0x8},
},
{
"one",
[]byte{1},
[]byte{0xd4, 0x8, 0x1},
},
{
"long",
longArrow,
[]byte{
0xc8, 0x1, 0x10, 0x8, 0xff, 0xff, 0xff, 0xff, 0x70, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0,
0x0, 0x9e, 0xff, 0xff, 0xff, 0x4, 0x0, 0x1, 0x0, 0x4, 0x0, 0x0, 0x0, 0xb6, 0xff, 0xff,
0xff, 0xc, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0,
0x4, 0x0, 0x0, 0x0, 0xda, 0xff, 0xff, 0xff, 0x14, 0x0, 0x0, 0x0, 0x2, 0x2, 0x0, 0x0,
0x4, 0x0, 0x0, 0x0, 0xf0, 0xff, 0xff, 0xff, 0x40, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0,
0x61, 0x0, 0x0, 0x0, 0x6, 0x0, 0x8, 0x0, 0x4, 0x0, 0xc, 0x0, 0x10, 0x0, 0x4, 0x0, 0x8,
0x0, 0x9, 0x0, 0xc, 0x0, 0xc, 0x0, 0xc, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x8, 0x0,
0xa, 0x0, 0xc, 0x0, 0x4, 0x0, 0x6, 0x0, 0x8, 0x0, 0xff, 0xff, 0xff, 0xff, 0x88, 0x0,
0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x8a, 0xff, 0xff, 0xff, 0x4, 0x0, 0x3, 0x0, 0x10, 0x0,
0x0, 0x0, 0x8, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xac, 0xff, 0xff,
0xff, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x34, 0x0, 0x0, 0x0, 0x8, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x8, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa, 0x0, 0x14, 0x0,
0x4, 0x0, 0xc, 0x0, 0x10, 0x0, 0xc, 0x0, 0x14, 0x0, 0x4, 0x0, 0x6, 0x0, 0x8, 0x0, 0xc,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
},
},
}

func TestEncodeArrow(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
buf := bytes.NewBuffer([]byte{})
enc := msgpack.NewEncoder(buf)

arr, err := arrow.MakeArrow(tt.arr)
require.NoError(t, err)

err = enc.Encode(arr)
require.NoError(t, err)

require.Equal(t, tt.enc, buf.Bytes())
})

}
}

func TestDecodeArrow(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

buf := bytes.NewBuffer(tt.enc)
dec := msgpack.NewDecoder(buf)

var arr arrow.Arrow
err := dec.Decode(&arr)
require.NoError(t, err)

require.Equal(t, tt.arr, arr.Raw())
})
}
}
31 changes: 31 additions & 0 deletions arrow/config-memcs.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- Do not set listen for now so connector won't be
-- able to send requests until everything is configured.
box.cfg {
work_dir = os.getenv("TEST_TNT_WORK_DIR")
}

box.schema.user.create('test', {
password = 'test',
if_not_exists = true
})
box.schema.user.grant('test', 'execute', 'universe', nil, {
if_not_exists = true
})

local s = box.schema.space.create('testArrow', {
engine = 'memcs',
field_count = 1,
format = {{'a', 'uint64'}},
if_not_exists = true
})
s:create_index('primary')
s:truncate()

box.schema.user.grant('test', 'read,write', 'space', 'testArrow', {
if_not_exists = true
})

-- Set listen only when every other thing is configured.
box.cfg {
listen = 3013
}
35 changes: 35 additions & 0 deletions arrow/config-memtx.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- Do not set listen for now so connector won't be
-- able to send requests until everything is configured.
box.cfg {
work_dir = os.getenv("TEST_TNT_WORK_DIR")
}

box.schema.user.create('test', {
password = 'test',
if_not_exists = true
})
box.schema.user.grant('test', 'execute', 'universe', nil, {
if_not_exists = true
})

local s = box.schema.space.create('testArrow', {
if_not_exists = true
})
s:create_index('primary', {
type = 'tree',
parts = {{
field = 1,
type = 'integer'
}},
if_not_exists = true
})
s:truncate()

box.schema.user.grant('test', 'read,write', 'space', 'testArrow', {
if_not_exists = true
})

-- Set listen only when every other thing is configured.
box.cfg {
listen = os.getenv("TEST_TNT_LISTEN")
}
61 changes: 61 additions & 0 deletions arrow/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Run Tarantool Enterprise Edition instance before example execution:
//
// Terminal 1:
// $ cd arrow
// $ TEST_TNT_WORK_DIR=$(mktemp -d -t 'tarantool.XXX') tarantool config-memcs.lua
//
// Terminal 2:
// $ go test -v example_test.go
package arrow_test

import (
"context"
"encoding/hex"
"fmt"
"log"
"time"

"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/arrow"
)

var arrowBinData, _ = hex.DecodeString("ffffffff70000000040000009effffff0400010004000000" +
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
"01000000000000003400000008000000000000000200000000000000000000000000" +
"00000000000000000000000000000800000000000000000000000100000001000000" +
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
"00000000000000000000")

func Example() {
dialer := tarantool.NetDialer{
Address: "127.0.0.1:3013",
User: "test",
Password: "test",
}
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
client, err := tarantool.Connect(ctx, dialer, tarantool.Opts{})
cancel()
if err != nil {
log.Fatalf("Failed to connect: %s", err)
}

arr, err := arrow.MakeArrow(arrowBinData)
if err != nil {
log.Fatalf("Failed prepare Arrow data: %s", err)
}

req := arrow.NewInsertRequest("testArrow", arr)

resp, err := client.Do(req).Get()
if err != nil {
log.Fatalf("Failed insert Arrow: %s", err)
}
if len(resp) > 0 {
log.Fatalf("Unexpected response")
} else {
fmt.Printf("Batch arrow inserted")
}
}
93 changes: 93 additions & 0 deletions arrow/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package arrow

import (
"context"
"io"

"github.com/tarantool/go-iproto"
"github.com/tarantool/go-tarantool/v2"
"github.com/vmihailenco/msgpack/v5"
)

// INSERT Arrow request.
//
// FIXME: replace with iproto.IPROTO_INSERT_ARROW when iproto will released.
// https://github.com/tarantool/go-tarantool/issues/412
const iprotoInsertArrowType = iproto.Type(17)

// The data in Arrow format.
//
// FIXME: replace with iproto.IPROTO_ARROW when iproto will released.
// https://github.com/tarantool/go-tarantool/issues/412
const iprotoArrowKey = iproto.Key(0x36)

// InsertRequest helps you to create an insert request object for execution
// by a Connection.
type InsertRequest struct {
arrow Arrow
space interface{}
ctx context.Context
}

// NewInsertRequest returns a new InsertRequest.
func NewInsertRequest(space interface{}, arrow Arrow) *InsertRequest {
return &InsertRequest{
space: space,
arrow: arrow,
}
}

// Type returns a IPROTO_INSERT_ARROW type for the request.
func (r *InsertRequest) Type() iproto.Type {
return iprotoInsertArrowType
}

// Async returns false to the request return a response.
func (r *InsertRequest) Async() bool {
return false
}

// Ctx returns a context of the request.
func (r *InsertRequest) Ctx() context.Context {
return r.ctx
}

// Context sets a passed context to the request.
//
// Pay attention that when using context with request objects,
// the timeout option for Connection does not affect the lifetime
// of the request. For those purposes use context.WithTimeout() as
// the root context.
func (r *InsertRequest) Context(ctx context.Context) *InsertRequest {
r.ctx = ctx
return r
}

// Arrow sets the arrow for insertion the insert arrow request.
// Note: default value is nil.
func (r *InsertRequest) Arrow(arrow Arrow) *InsertRequest {
r.arrow = arrow
return r
}

// Body fills an msgpack.Encoder with the insert arrow request body.
func (r *InsertRequest) Body(res tarantool.SchemaResolver, enc *msgpack.Encoder) error {
if err := enc.EncodeMapLen(2); err != nil {
return err
}
if err := tarantool.EncodeSpace(res, enc, r.space); err != nil {
return err
}
if err := enc.EncodeUint(uint64(iprotoArrowKey)); err != nil {
return err
}
return enc.Encode(r.arrow)
}

// Response creates a response for the InsertRequest.
func (r *InsertRequest) Response(
header tarantool.Header,
body io.Reader,
) (tarantool.Response, error) {
return tarantool.DecodeBaseResponse(header, body)
}
Loading

0 comments on commit 7d49fd0

Please sign in to comment.