From 403d9d19822ab3b483ec22f760741fcbdfa45126 Mon Sep 17 00:00:00 2001 From: DerekBum Date: Tue, 26 Sep 2023 15:51:19 +0300 Subject: [PATCH] api: add context to connection create `connection.Connect` and `pool.Connect` no longer return non-working connection objects. Those functions now accept context as their first arguments, which user may cancel in process. `connection.Connect` will block until either the working connection created (and returned), `opts.MaxReconnects` creation attempts were made (returns error) or the context is canceled by user (returns error too). Closes #136 --- CHANGELOG.md | 3 + README.md | 4 +- connection.go | 47 ++++--- crud/example_test.go | 3 +- crud/tarantool_test.go | 3 +- datetime/example_test.go | 3 +- decimal/example_test.go | 3 +- dial_test.go | 21 ++-- example_custom_unpacking_test.go | 3 +- example_test.go | 21 ++-- pool/connection_pool.go | 37 +++--- pool/connection_pool_test.go | 169 ++++++++++++++++---------- pool/example_test.go | 4 +- queue/example_connection_pool_test.go | 3 +- queue/example_msgpack_test.go | 3 +- queue/example_test.go | 3 +- settings/example_test.go | 3 +- shutdown_test.go | 3 +- ssl_test.go | 3 +- tarantool_test.go | 56 +++++++-- test_helpers/main.go | 3 +- test_helpers/pool_helper.go | 5 +- test_helpers/utils.go | 3 +- uuid/example_test.go | 4 +- 24 files changed, 270 insertions(+), 140 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 211ea25a2..58e6878c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,9 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. decoded to a varbinary object (#313). - Use objects of the Decimal type instead of pointers (#238) - Use objects of the Datetime type instead of pointers (#238) +- `connection.Connect` and `pool.Connect` no longer return non-working + connection objects (#136). Those functions now accept context as their first + arguments, which user may cancel in process. ### Deprecated diff --git a/README.md b/README.md index aa4c6deac..0b4520206 100644 --- a/README.md +++ b/README.md @@ -105,13 +105,15 @@ about what it does. package tarantool import ( + "context" "fmt" "github.com/tarantool/go-tarantool/v2" ) func main() { opts := tarantool.Opts{User: "guest"} - conn, err := tarantool.Connect("127.0.0.1:3301", opts) + ctx := context.Background() + conn, err := tarantool.Connect(ctx, "127.0.0.1:3301", opts) if err != nil { fmt.Println("Connection refused:", err) } diff --git a/connection.go b/connection.go index 9bb42626a..a5f8057e6 100644 --- a/connection.go +++ b/connection.go @@ -381,10 +381,11 @@ func (opts Opts) Clone() Opts { // - If opts.Reconnect is zero (default), then connection either already connected // or error is returned. // -// - If opts.Reconnect is non-zero, then error will be returned only if authorization -// fails. But if Tarantool is not reachable, then it will make an attempt to reconnect later -// and will not finish to make attempts on authorization failures. -func Connect(addr string, opts Opts) (conn *Connection, err error) { +// - If opts.Reconnect is non-zero, then error will be returned if authorization +// fails, or user has canceled context. If Tarantool is not reachable, then it +// will make attempts to reconnect and will not finish to make attempts on +// authorization failures. +func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err error) { conn = &Connection{ addr: addr, requestId: 0, @@ -432,7 +433,7 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) { conn.cond = sync.NewCond(&conn.mutex) - if err = conn.createConnection(false); err != nil { + if err = conn.createConnection(ctx, false); err != nil { ter, ok := err.(Error) if conn.opts.Reconnect <= 0 { return nil, err @@ -441,15 +442,13 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) { // Reported auth errors immediately. return nil, err } else { - // Without SkipSchema it is useless. - go func(conn *Connection) { - conn.mutex.Lock() - defer conn.mutex.Unlock() - if err := conn.createConnection(true); err != nil { - conn.closeConnection(err, true) - } - }(conn) - err = nil + conn.mutex.Lock() + defer conn.mutex.Unlock() + + if err := conn.createConnection(ctx, true); err != nil { + conn.closeConnection(err, true) + return nil, err + } } } @@ -658,7 +657,8 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, return } -func (conn *Connection) createConnection(reconnect bool) (err error) { +func (conn *Connection) createConnection(ctx context.Context, + reconnect bool) (err error) { var reconnects uint for conn.c == nil && conn.state == connDisconnected { now := time.Now() @@ -679,7 +679,20 @@ func (conn *Connection) createConnection(reconnect bool) (err error) { conn.notify(ReconnectFailed) reconnects++ conn.mutex.Unlock() - time.Sleep(time.Until(now.Add(conn.opts.Reconnect))) + + timer := time.NewTimer(time.Until(now.Add(conn.opts.Reconnect))) + waitLoop: + for { + select { + case <-ctx.Done(): + conn.mutex.Lock() + err = ClientError{ErrConnectionClosed, "context is canceled"} + return + case <-timer.C: + break waitLoop + } + } + conn.mutex.Lock() } if conn.state == connClosed { @@ -731,7 +744,7 @@ func (conn *Connection) reconnectImpl(neterr error, c Conn) { if conn.opts.Reconnect > 0 { if c == conn.c { conn.closeConnection(neterr, false) - if err := conn.createConnection(true); err != nil { + if err := conn.createConnection(context.Background(), true); err != nil { conn.closeConnection(err, true) } } diff --git a/crud/example_test.go b/crud/example_test.go index 363d0570d..567d2dfeb 100644 --- a/crud/example_test.go +++ b/crud/example_test.go @@ -1,6 +1,7 @@ package crud_test import ( + "context" "fmt" "reflect" "time" @@ -21,7 +22,7 @@ var exampleOpts = tarantool.Opts{ } func exampleConnect() *tarantool.Connection { - conn, err := tarantool.Connect(exampleServer, exampleOpts) + conn, err := tarantool.Connect(context.Background(), exampleServer, exampleOpts) if err != nil { panic("Connection is not established: " + err.Error()) } diff --git a/crud/tarantool_test.go b/crud/tarantool_test.go index 5cf29f66a..c88a228ac 100644 --- a/crud/tarantool_test.go +++ b/crud/tarantool_test.go @@ -1,6 +1,7 @@ package crud_test import ( + "context" "fmt" "log" "os" @@ -108,7 +109,7 @@ var object = crud.MapObject{ func connect(t testing.TB) *tarantool.Connection { for i := 0; i < 10; i++ { - conn, err := tarantool.Connect(server, opts) + conn, err := tarantool.Connect(context.Background(), server, opts) if err != nil { t.Fatalf("Failed to connect: %s", err) } diff --git a/datetime/example_test.go b/datetime/example_test.go index 346551629..2f63208a2 100644 --- a/datetime/example_test.go +++ b/datetime/example_test.go @@ -9,6 +9,7 @@ package datetime_test import ( + "context" "fmt" "time" @@ -23,7 +24,7 @@ func Example() { User: "test", Pass: "test", } - conn, err := tarantool.Connect("127.0.0.1:3013", opts) + conn, err := tarantool.Connect(context.Background(), "127.0.0.1:3013", opts) if err != nil { fmt.Printf("Error in connect is %v", err) return diff --git a/decimal/example_test.go b/decimal/example_test.go index a355767f1..0e32a51d2 100644 --- a/decimal/example_test.go +++ b/decimal/example_test.go @@ -9,6 +9,7 @@ package decimal_test import ( + "context" "log" "time" @@ -28,7 +29,7 @@ func Example() { User: "test", Pass: "test", } - client, err := tarantool.Connect(server, opts) + client, err := tarantool.Connect(context.Background(), server, opts) if err != nil { log.Fatalf("Failed to connect: %s", err.Error()) } diff --git a/dial_test.go b/dial_test.go index ff8a50aab..df12b41a1 100644 --- a/dial_test.go +++ b/dial_test.go @@ -2,6 +2,7 @@ package tarantool_test import ( "bytes" + "context" "errors" "net" "sync" @@ -29,9 +30,10 @@ func TestDialer_Dial_error(t *testing.T) { err: errors.New(errMsg), } - conn, err := tarantool.Connect("any", tarantool.Opts{ - Dialer: dialer, - }) + conn, err := tarantool.Connect(context.Background(), "any", + tarantool.Opts{ + Dialer: dialer, + }) assert.Nil(t, conn) assert.ErrorContains(t, err, errMsg) } @@ -73,7 +75,7 @@ func TestDialer_Dial_passedOpts(t *testing.T) { } dialer := &mockPassedDialer{} - conn, err := tarantool.Connect(addr, tarantool.Opts{ + conn, err := tarantool.Connect(context.Background(), addr, tarantool.Opts{ Dialer: dialer, Timeout: opts.IoTimeout, Transport: opts.Transport, @@ -203,11 +205,12 @@ func dialIo(t *testing.T, dialer := mockIoDialer{ init: init, } - conn, err := tarantool.Connect("any", tarantool.Opts{ - Dialer: &dialer, - Timeout: 1000 * time.Second, // Avoid pings. - SkipSchema: true, - }) + conn, err := tarantool.Connect(context.Background(), "any", + tarantool.Opts{ + Dialer: &dialer, + Timeout: 1000 * time.Second, // Avoid pings. + SkipSchema: true, + }) require.Nil(t, err) require.NotNil(t, conn) diff --git a/example_custom_unpacking_test.go b/example_custom_unpacking_test.go index 1189e16a3..bd60fd901 100644 --- a/example_custom_unpacking_test.go +++ b/example_custom_unpacking_test.go @@ -1,6 +1,7 @@ package tarantool_test import ( + "context" "fmt" "log" "time" @@ -84,7 +85,7 @@ func Example_customUnpacking() { User: "test", Pass: "test", } - conn, err := tarantool.Connect(server, opts) + conn, err := tarantool.Connect(context.Background(), server, opts) if err != nil { log.Fatalf("Failed to connect: %s", err.Error()) } diff --git a/example_test.go b/example_test.go index d871d578a..3037519dd 100644 --- a/example_test.go +++ b/example_test.go @@ -19,7 +19,7 @@ type Tuple struct { } func exampleConnect(opts tarantool.Opts) *tarantool.Connection { - conn, err := tarantool.Connect(server, opts) + conn, err := tarantool.Connect(context.Background(), server, opts) if err != nil { panic("Connection is not established: " + err.Error()) } @@ -38,7 +38,7 @@ func ExampleSslOpts() { CaFile: "testdata/ca.crt", }, } - _, err := tarantool.Connect("127.0.0.1:3013", opts) + _, err := tarantool.Connect(context.Background(), "127.0.0.1:3013", opts) if err != nil { panic("Connection is not established: " + err.Error()) } @@ -913,12 +913,13 @@ func ExampleFuture_GetIterator() { } func ExampleConnect() { - conn, err := tarantool.Connect("127.0.0.1:3013", tarantool.Opts{ - Timeout: 5 * time.Second, - User: "test", - Pass: "test", - Concurrency: 32, - }) + conn, err := tarantool.Connect(context.Background(), "127.0.0.1:3013", + tarantool.Opts{ + Timeout: 5 * time.Second, + User: "test", + Pass: "test", + Concurrency: 32, + }) if err != nil { fmt.Println("No connection available") return @@ -1081,7 +1082,7 @@ func ExampleConnection_NewPrepared() { User: "test", Pass: "test", } - conn, err := tarantool.Connect(server, opts) + conn, err := tarantool.Connect(context.Background(), server, opts) if err != nil { fmt.Printf("Failed to connect: %s", err.Error()) } @@ -1127,7 +1128,7 @@ func ExampleConnection_NewWatcher() { Features: []tarantool.ProtocolFeature{tarantool.WatchersFeature}, }, } - conn, err := tarantool.Connect(server, opts) + conn, err := tarantool.Connect(context.Background(), server, opts) if err != nil { fmt.Printf("Failed to connect: %s\n", err) return diff --git a/pool/connection_pool.go b/pool/connection_pool.go index 26e2199e9..3fde5619a 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -11,6 +11,7 @@ package pool import ( + "context" "errors" "log" "sync" @@ -133,7 +134,8 @@ func newEndpoint(addr string) *endpoint { // ConnectWithOpts creates pool for instances with addresses addrs // with options opts. -func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts Opts) (*ConnectionPool, error) { +func ConnectWithOpts(ctx context.Context, addrs []string, + connOpts tarantool.Opts, opts Opts) (*ConnectionPool, error) { if len(addrs) == 0 { return nil, ErrEmptyAddrs } @@ -161,7 +163,7 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts Opts) (*Conne connPool.addrs[addr] = nil } - somebodyAlive := connPool.fillPools() + somebodyAlive := connPool.fillPools(ctx) if !somebodyAlive { connPool.state.set(closedState) return nil, ErrNoConnection @@ -170,7 +172,7 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts Opts) (*Conne connPool.state.set(connectedState) for _, s := range connPool.addrs { - go connPool.controller(s) + go connPool.controller(ctx, s) } return connPool, nil @@ -181,11 +183,12 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts Opts) (*Conne // It is useless to set up tarantool.Opts.Reconnect value for a connection. // The connection pool has its own reconnection logic. See // Opts.CheckTimeout description. -func Connect(addrs []string, connOpts tarantool.Opts) (*ConnectionPool, error) { +func Connect(ctx context.Context, addrs []string, + connOpts tarantool.Opts) (*ConnectionPool, error) { opts := Opts{ CheckTimeout: 1 * time.Second, } - return ConnectWithOpts(addrs, connOpts, opts) + return ConnectWithOpts(ctx, addrs, connOpts, opts) } // ConnectedNow gets connected status of pool. @@ -224,7 +227,7 @@ func (p *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) { // Add adds a new endpoint with the address into the pool. This function // adds the endpoint only after successful connection. -func (p *ConnectionPool) Add(addr string) error { +func (p *ConnectionPool) Add(ctx context.Context, addr string) error { e := newEndpoint(addr) p.addrsMutex.Lock() @@ -240,7 +243,7 @@ func (p *ConnectionPool) Add(addr string) error { p.addrs[addr] = e p.addrsMutex.Unlock() - if err := p.tryConnect(e); err != nil { + if err := p.tryConnect(ctx, e); err != nil { p.addrsMutex.Lock() delete(p.addrs, addr) p.addrsMutex.Unlock() @@ -248,7 +251,7 @@ func (p *ConnectionPool) Add(addr string) error { return err } - go p.controller(e) + go p.controller(ctx, e) return nil } @@ -1109,7 +1112,7 @@ func (p *ConnectionPool) handlerDeactivated(conn *tarantool.Connection, } } -func (p *ConnectionPool) fillPools() bool { +func (p *ConnectionPool) fillPools(ctx context.Context) bool { somebodyAlive := false // It is called before controller() goroutines so we don't expect @@ -1120,7 +1123,7 @@ func (p *ConnectionPool) fillPools() bool { connOpts := p.connOpts connOpts.Notify = end.notify - conn, err := tarantool.Connect(addr, connOpts) + conn, err := tarantool.Connect(ctx, addr, connOpts) if err != nil { log.Printf("tarantool: connect to %s failed: %s\n", addr, err.Error()) } else if conn != nil { @@ -1213,7 +1216,7 @@ func (p *ConnectionPool) updateConnection(e *endpoint) { } } -func (p *ConnectionPool) tryConnect(e *endpoint) error { +func (p *ConnectionPool) tryConnect(ctx context.Context, e *endpoint) error { p.poolsMutex.Lock() if p.state.get() != connectedState { @@ -1226,7 +1229,7 @@ func (p *ConnectionPool) tryConnect(e *endpoint) error { connOpts := p.connOpts connOpts.Notify = e.notify - conn, err := tarantool.Connect(e.addr, connOpts) + conn, err := tarantool.Connect(ctx, e.addr, connOpts) if err == nil { role, err := p.getConnectionRole(conn) p.poolsMutex.Unlock() @@ -1265,7 +1268,7 @@ func (p *ConnectionPool) tryConnect(e *endpoint) error { return err } -func (p *ConnectionPool) reconnect(e *endpoint) { +func (p *ConnectionPool) reconnect(ctx context.Context, e *endpoint) { p.poolsMutex.Lock() if p.state.get() != connectedState { @@ -1280,10 +1283,10 @@ func (p *ConnectionPool) reconnect(e *endpoint) { e.conn = nil e.role = UnknownRole - p.tryConnect(e) + p.tryConnect(ctx, e) } -func (p *ConnectionPool) controller(e *endpoint) { +func (p *ConnectionPool) controller(ctx context.Context, e *endpoint) { timer := time.NewTicker(p.opts.CheckTimeout) defer timer.Stop() @@ -1367,11 +1370,11 @@ func (p *ConnectionPool) controller(e *endpoint) { // Relocate connection between subpools // if ro/rw was updated. if e.conn == nil { - p.tryConnect(e) + p.tryConnect(ctx, e) } else if !e.conn.ClosedNow() { p.updateConnection(e) } else { - p.reconnect(e) + p.reconnect(ctx, e) } } } diff --git a/pool/connection_pool_test.go b/pool/connection_pool_test.go index dd0210d62..b20566325 100644 --- a/pool/connection_pool_test.go +++ b/pool/connection_pool_test.go @@ -1,6 +1,7 @@ package pool_test import ( + "context" "fmt" "log" "os" @@ -46,17 +47,18 @@ var defaultTimeoutRetry = 500 * time.Millisecond var instances []test_helpers.TarantoolInstance func TestConnError_IncorrectParams(t *testing.T) { - connPool, err := pool.Connect([]string{}, tarantool.Opts{}) + connPool, err := pool.Connect(context.Background(), []string{}, tarantool.Opts{}) require.Nilf(t, connPool, "conn is not nil with incorrect param") require.NotNilf(t, err, "err is nil with incorrect params") require.Equal(t, "addrs (first argument) should not be empty", err.Error()) - connPool, err = pool.Connect([]string{"err1", "err2"}, connOpts) + connPool, err = pool.Connect(context.Background(), []string{"err1", "err2"}, connOpts) require.Nilf(t, connPool, "conn is not nil with incorrect param") require.NotNilf(t, err, "err is nil with incorrect params") require.Equal(t, "no active connections", err.Error()) - connPool, err = pool.ConnectWithOpts(servers, tarantool.Opts{}, pool.Opts{}) + connPool, err = pool.ConnectWithOpts(context.Background(), servers, + tarantool.Opts{}, pool.Opts{}) require.Nilf(t, connPool, "conn is not nil with incorrect param") require.NotNilf(t, err, "err is nil with incorrect params") require.Equal(t, "wrong check timeout, must be greater than 0", err.Error()) @@ -64,7 +66,7 @@ func TestConnError_IncorrectParams(t *testing.T) { func TestConnSuccessfully(t *testing.T) { server := servers[0] - connPool, err := pool.Connect([]string{"err", server}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{"err", server}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -84,9 +86,52 @@ func TestConnSuccessfully(t *testing.T) { require.Nil(t, err) } +func TestConnErrorAfterCtxCancel(t *testing.T) { + var connLongReconnectOpts = tarantool.Opts{ + Timeout: 5 * time.Second, + User: "test", + Pass: "test", + Reconnect: time.Second, + MaxReconnects: 100, + } + + ctx, cancel := context.WithCancel(context.Background()) + + var connPool *pool.ConnectionPool + var err error + + go func() { + connPool, err = pool.Connect(ctx, []string{"err"}, + connLongReconnectOpts) + }() + + time.Sleep(5 * time.Second) + + if err != nil && connPool != nil { + t.Fatalf("Connect ended before cancel and reconnects") + } + cancel() + + retErr := test_helpers.Retry(func(interface{}) error { + if err == nil { + return fmt.Errorf("didn't get the error from Connect") + } + return nil + }, nil, 10, time.Second) + + if connPool != nil { + t.Fatalf("ConnectionPool was created after cancel") + } + if retErr != nil || err.Error() != "no active connections" { + t.Fatalf("Wrong error, exected %s, got %v", + "no active connections", err) + } +} + func TestConnSuccessfullyDuplicates(t *testing.T) { server := servers[0] - connPool, err := pool.Connect([]string{server, server, server, server}, connOpts) + connPool, err := pool.Connect(context.Background(), + []string{server, server, server, server}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -112,7 +157,7 @@ func TestConnSuccessfullyDuplicates(t *testing.T) { func TestReconnect(t *testing.T) { server := servers[0] - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -158,7 +203,7 @@ func TestDisconnect_withReconnect(t *testing.T) { opts := connOpts opts.Reconnect = 10 * time.Second - connPool, err := pool.Connect([]string{servers[serverId]}, opts) + connPool, err := pool.Connect(context.Background(), []string{servers[serverId]}, opts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -202,7 +247,7 @@ func TestDisconnectAll(t *testing.T) { server1 := servers[0] server2 := servers[1] - connPool, err := pool.Connect([]string{server1, server2}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{server1, server2}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -249,14 +294,14 @@ func TestDisconnectAll(t *testing.T) { } func TestAdd(t *testing.T) { - connPool, err := pool.Connect([]string{servers[0]}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{servers[0]}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() for _, server := range servers[1:] { - err = connPool.Add(server) + err = connPool.Add(context.Background(), server) require.Nil(t, err) } @@ -280,13 +325,13 @@ func TestAdd(t *testing.T) { } func TestAdd_exist(t *testing.T) { - connPool, err := pool.Connect([]string{servers[0]}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{servers[0]}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() - err = connPool.Add(servers[0]) + err = connPool.Add(context.Background(), servers[0]) require.Equal(t, pool.ErrExists, err) args := test_helpers.CheckStatusesArgs{ @@ -305,13 +350,13 @@ func TestAdd_exist(t *testing.T) { } func TestAdd_unreachable(t *testing.T) { - connPool, err := pool.Connect([]string{servers[0]}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{servers[0]}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() - err = connPool.Add("127.0.0.2:6667") + err = connPool.Add(context.Background(), "127.0.0.2:6667") // The OS-dependent error so we just check for existence. require.NotNil(t, err) @@ -331,17 +376,17 @@ func TestAdd_unreachable(t *testing.T) { } func TestAdd_afterClose(t *testing.T) { - connPool, err := pool.Connect([]string{servers[0]}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{servers[0]}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") connPool.Close() - err = connPool.Add(servers[0]) + err = connPool.Add(context.Background(), servers[0]) assert.Equal(t, err, pool.ErrClosed) } func TestAdd_Close_concurrent(t *testing.T) { - connPool, err := pool.Connect([]string{servers[0]}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{servers[0]}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -350,7 +395,7 @@ func TestAdd_Close_concurrent(t *testing.T) { go func() { defer wg.Done() - err = connPool.Add(servers[1]) + err = connPool.Add(context.Background(), servers[1]) if err != nil { assert.Equal(t, pool.ErrClosed, err) } @@ -362,7 +407,7 @@ func TestAdd_Close_concurrent(t *testing.T) { } func TestAdd_CloseGraceful_concurrent(t *testing.T) { - connPool, err := pool.Connect([]string{servers[0]}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{servers[0]}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -371,7 +416,7 @@ func TestAdd_CloseGraceful_concurrent(t *testing.T) { go func() { defer wg.Done() - err = connPool.Add(servers[1]) + err = connPool.Add(context.Background(), servers[1]) if err != nil { assert.Equal(t, pool.ErrClosed, err) } @@ -383,7 +428,7 @@ func TestAdd_CloseGraceful_concurrent(t *testing.T) { } func TestRemove(t *testing.T) { - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -410,7 +455,7 @@ func TestRemove(t *testing.T) { } func TestRemove_double(t *testing.T) { - connPool, err := pool.Connect([]string{servers[0], servers[1]}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{servers[0], servers[1]}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -437,7 +482,7 @@ func TestRemove_double(t *testing.T) { } func TestRemove_unknown(t *testing.T) { - connPool, err := pool.Connect([]string{servers[0], servers[1]}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{servers[0], servers[1]}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -463,7 +508,7 @@ func TestRemove_unknown(t *testing.T) { } func TestRemove_concurrent(t *testing.T) { - connPool, err := pool.Connect([]string{servers[0], servers[1]}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{servers[0], servers[1]}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -510,7 +555,7 @@ func TestRemove_concurrent(t *testing.T) { } func TestRemove_Close_concurrent(t *testing.T) { - connPool, err := pool.Connect([]string{servers[0], servers[1]}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{servers[0], servers[1]}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -529,7 +574,7 @@ func TestRemove_Close_concurrent(t *testing.T) { } func TestRemove_CloseGraceful_concurrent(t *testing.T) { - connPool, err := pool.Connect([]string{servers[0], servers[1]}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{servers[0], servers[1]}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -551,7 +596,7 @@ func TestClose(t *testing.T) { server1 := servers[0] server2 := servers[1] - connPool, err := pool.Connect([]string{server1, server2}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{server1, server2}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -591,7 +636,7 @@ func TestCloseGraceful(t *testing.T) { server1 := servers[0] server2 := servers[1] - connPool, err := pool.Connect([]string{server1, server2}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{server1, server2}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -730,7 +775,7 @@ func TestConnectionHandlerOpenUpdateClose(t *testing.T) { CheckTimeout: 100 * time.Microsecond, ConnectionHandler: h, } - connPool, err := pool.ConnectWithOpts(poolServers, connOpts, poolOpts) + connPool, err := pool.ConnectWithOpts(context.Background(), poolServers, connOpts, poolOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -804,7 +849,7 @@ func TestConnectionHandlerOpenError(t *testing.T) { CheckTimeout: 100 * time.Microsecond, ConnectionHandler: h, } - connPool, err := pool.ConnectWithOpts(poolServers, connOpts, poolOpts) + connPool, err := pool.ConnectWithOpts(context.Background(), poolServers, connOpts, poolOpts) if err == nil { defer connPool.Close() } @@ -846,7 +891,7 @@ func TestConnectionHandlerUpdateError(t *testing.T) { CheckTimeout: 100 * time.Microsecond, ConnectionHandler: h, } - connPool, err := pool.ConnectWithOpts(poolServers, connOpts, poolOpts) + connPool, err := pool.ConnectWithOpts(context.Background(), poolServers, connOpts, poolOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() @@ -891,7 +936,7 @@ func TestRequestOnClosed(t *testing.T) { server1 := servers[0] server2 := servers[1] - connPool, err := pool.Connect([]string{server1, server2}, connOpts) + connPool, err := pool.Connect(context.Background(), []string{server1, server2}, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -930,7 +975,7 @@ func TestGetPoolInfo(t *testing.T) { srvs := []string{server1, server2} expected := []string{server1, server2} - connPool, err := pool.Connect(srvs, connOpts) + connPool, err := pool.Connect(context.Background(), srvs, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -948,7 +993,7 @@ func TestCall(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1005,7 +1050,7 @@ func TestCall16(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1062,7 +1107,7 @@ func TestCall17(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1119,7 +1164,7 @@ func TestEval(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1197,7 +1242,7 @@ func TestExecute(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1254,7 +1299,7 @@ func TestRoundRobinStrategy(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1331,7 +1376,7 @@ func TestRoundRobinStrategy_NoReplica(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1402,7 +1447,7 @@ func TestRoundRobinStrategy_NoMaster(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1485,7 +1530,7 @@ func TestUpdateInstancesRoles(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1629,7 +1674,7 @@ func TestInsert(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1728,7 +1773,7 @@ func TestDelete(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1792,7 +1837,7 @@ func TestUpsert(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1864,7 +1909,7 @@ func TestUpdate(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1954,7 +1999,7 @@ func TestReplace(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2040,7 +2085,7 @@ func TestSelect(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2160,7 +2205,7 @@ func TestPing(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2198,7 +2243,7 @@ func TestDo(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2237,7 +2282,7 @@ func TestDo_concurrent(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2268,7 +2313,7 @@ func TestNewPrepared(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2336,7 +2381,7 @@ func TestDoWithStrangerConn(t *testing.T) { err := test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2365,7 +2410,7 @@ func TestStream_Commit(t *testing.T) { err = test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() @@ -2464,7 +2509,7 @@ func TestStream_Rollback(t *testing.T) { err = test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() @@ -2563,7 +2608,7 @@ func TestStream_TxnIsolationLevel(t *testing.T) { err = test_helpers.SetClusterRO(servers, connOpts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, connOpts) + connPool, err := pool.Connect(context.Background(), servers, connOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() @@ -2657,7 +2702,7 @@ func TestConnectionPool_NewWatcher_noWatchersFeature(t *testing.T) { err := test_helpers.SetClusterRO(servers, opts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, opts) + connPool, err := pool.Connect(context.Background(), servers, opts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() @@ -2685,7 +2730,7 @@ func TestConnectionPool_NewWatcher_modes(t *testing.T) { err := test_helpers.SetClusterRO(servers, opts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, opts) + connPool, err := pool.Connect(context.Background(), servers, opts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() @@ -2767,7 +2812,7 @@ func TestConnectionPool_NewWatcher_update(t *testing.T) { poolOpts := pool.Opts{ CheckTimeout: 500 * time.Millisecond, } - pool, err := pool.ConnectWithOpts(servers, opts, poolOpts) + pool, err := pool.ConnectWithOpts(context.Background(), servers, opts, poolOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, pool, "conn is nil after Connect") @@ -2851,7 +2896,7 @@ func TestWatcher_Unregister(t *testing.T) { err := test_helpers.SetClusterRO(servers, opts, roles) require.Nilf(t, err, "fail to set roles for cluster") - pool, err := pool.Connect(servers, opts) + pool, err := pool.Connect(context.Background(), servers, opts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, pool, "conn is nil after Connect") defer pool.Close() @@ -2910,7 +2955,7 @@ func TestConnectionPool_NewWatcher_concurrent(t *testing.T) { err := test_helpers.SetClusterRO(servers, opts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, opts) + connPool, err := pool.Connect(context.Background(), servers, opts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() @@ -2950,7 +2995,7 @@ func TestWatcher_Unregister_concurrent(t *testing.T) { err := test_helpers.SetClusterRO(servers, opts, roles) require.Nilf(t, err, "fail to set roles for cluster") - connPool, err := pool.Connect(servers, opts) + connPool, err := pool.Connect(context.Background(), servers, opts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() diff --git a/pool/example_test.go b/pool/example_test.go index 84a41ff7b..75204f22b 100644 --- a/pool/example_test.go +++ b/pool/example_test.go @@ -1,6 +1,7 @@ package pool_test import ( + "context" "fmt" "time" @@ -24,7 +25,8 @@ func examplePool(roles []bool, connOpts tarantool.Opts) (*pool.ConnectionPool, e if err != nil { return nil, fmt.Errorf("ConnectionPool is not established") } - connPool, err := pool.Connect(servers, connOpts) + ctx := context.Background() + connPool, err := pool.Connect(ctx, servers, connOpts) if err != nil || connPool == nil { return nil, fmt.Errorf("ConnectionPool is not established") } diff --git a/queue/example_connection_pool_test.go b/queue/example_connection_pool_test.go index 51fb967a5..5795b2682 100644 --- a/queue/example_connection_pool_test.go +++ b/queue/example_connection_pool_test.go @@ -1,6 +1,7 @@ package queue_test import ( + "context" "fmt" "sync" "sync/atomic" @@ -164,7 +165,7 @@ func Example_connectionPool() { CheckTimeout: 5 * time.Second, ConnectionHandler: h, } - connPool, err := pool.ConnectWithOpts(servers, connOpts, poolOpts) + connPool, err := pool.ConnectWithOpts(context.Background(), servers, connOpts, poolOpts) if err != nil { fmt.Printf("Unable to connect to the pool: %s", err) return diff --git a/queue/example_msgpack_test.go b/queue/example_msgpack_test.go index 6fd101e09..4737c1cf6 100644 --- a/queue/example_msgpack_test.go +++ b/queue/example_msgpack_test.go @@ -9,6 +9,7 @@ package queue_test import ( + "context" "fmt" "log" "time" @@ -55,7 +56,7 @@ func Example_simpleQueueCustomMsgPack() { User: "test", Pass: "test", } - conn, err := tarantool.Connect("127.0.0.1:3013", opts) + conn, err := tarantool.Connect(context.Background(), "127.0.0.1:3013", opts) if err != nil { log.Fatalf("connection: %s", err) return diff --git a/queue/example_test.go b/queue/example_test.go index 711ee31d4..7eeac8c43 100644 --- a/queue/example_test.go +++ b/queue/example_test.go @@ -9,6 +9,7 @@ package queue_test import ( + "context" "fmt" "time" @@ -31,7 +32,7 @@ func Example_simpleQueue() { Pass: "test", } - conn, err := tarantool.Connect("127.0.0.1:3013", opts) + conn, err := tarantool.Connect(context.Background(), "127.0.0.1:3013", opts) if err != nil { fmt.Printf("error in prepare is %v", err) return diff --git a/settings/example_test.go b/settings/example_test.go index b1d0e5d4f..1116443e2 100644 --- a/settings/example_test.go +++ b/settings/example_test.go @@ -1,6 +1,7 @@ package settings_test import ( + "context" "fmt" "github.com/tarantool/go-tarantool/v2" @@ -9,7 +10,7 @@ import ( ) func example_connect(opts tarantool.Opts) *tarantool.Connection { - conn, err := tarantool.Connect(server, opts) + conn, err := tarantool.Connect(context.Background(), server, opts) if err != nil { panic("Connection is not established: " + err.Error()) } diff --git a/shutdown_test.go b/shutdown_test.go index bb4cfa099..066393810 100644 --- a/shutdown_test.go +++ b/shutdown_test.go @@ -6,6 +6,7 @@ package tarantool_test import ( + "context" "fmt" "sync" "syscall" @@ -462,7 +463,7 @@ func TestGracefulShutdownCloseConcurrent(t *testing.T) { // Do not wait till Tarantool register out watcher, // test everything is ok even on async. - conn, err := Connect(shtdnServer, shtdnClntOpts) + conn, err := Connect(context.Background(), shtdnServer, shtdnClntOpts) if err != nil { t.Errorf("Failed to connect: %s", err) } else { diff --git a/ssl_test.go b/ssl_test.go index 30078703c..9d01244ef 100644 --- a/ssl_test.go +++ b/ssl_test.go @@ -4,6 +4,7 @@ package tarantool_test import ( + "context" "errors" "fmt" "io/ioutil" @@ -150,7 +151,7 @@ func serverTntStop(inst test_helpers.TarantoolInstance) { } func checkTntConn(clientOpts SslOpts) error { - conn, err := Connect(tntHost, Opts{ + conn, err := Connect(context.Background(), tntHost, Opts{ Auth: AutoAuth, Timeout: 500 * time.Millisecond, User: "test", diff --git a/tarantool_test.go b/tarantool_test.go index 6339164f1..363cd1f63 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -778,7 +778,7 @@ func TestOptsAuth_PapSha256AuthForbit(t *testing.T) { papSha256Opts := opts papSha256Opts.Auth = PapSha256Auth - conn, err := Connect(server, papSha256Opts) + conn, err := Connect(context.Background(), server, papSha256Opts) if err == nil { t.Error("An error expected.") conn.Close() @@ -3408,7 +3408,7 @@ func TestConnectionProtocolVersionRequirementSuccess(t *testing.T) { Version: ProtocolVersion(3), } - conn, err := Connect(server, connOpts) + conn, err := Connect(context.Background(), server, connOpts) require.Nilf(t, err, "No errors on connect") require.NotNilf(t, conn, "Connect success") @@ -3424,7 +3424,7 @@ func TestConnectionProtocolVersionRequirementFail(t *testing.T) { Version: ProtocolVersion(3), } - conn, err := Connect(server, connOpts) + conn, err := Connect(context.Background(), server, connOpts) require.Nilf(t, conn, "Connect fail") require.NotNilf(t, err, "Got error on connect") @@ -3439,7 +3439,7 @@ func TestConnectionProtocolFeatureRequirementSuccess(t *testing.T) { Features: []ProtocolFeature{TransactionsFeature}, } - conn, err := Connect(server, connOpts) + conn, err := Connect(context.Background(), server, connOpts) require.NotNilf(t, conn, "Connect success") require.Nilf(t, err, "No errors on connect") @@ -3455,7 +3455,7 @@ func TestConnectionProtocolFeatureRequirementFail(t *testing.T) { Features: []ProtocolFeature{TransactionsFeature}, } - conn, err := Connect(server, connOpts) + conn, err := Connect(context.Background(), server, connOpts) require.Nilf(t, conn, "Connect fail") require.NotNilf(t, err, "Got error on connect") @@ -3471,7 +3471,7 @@ func TestConnectionProtocolFeatureRequirementManyFail(t *testing.T) { Features: []ProtocolFeature{TransactionsFeature, ProtocolFeature(15532)}, } - conn, err := Connect(server, connOpts) + conn, err := Connect(context.Background(), server, connOpts) require.Nilf(t, conn, "Connect fail") require.NotNilf(t, err, "Got error on connect") @@ -4003,7 +4003,7 @@ func TestConnect_schema_update(t *testing.T) { for i := 0; i < 100; i++ { fut := conn.Do(NewCallRequest("create_spaces")) - if conn, err := Connect(server, opts); err != nil { + if conn, err := Connect(context.Background(), server, opts); err != nil { if err.Error() != "concurrent schema update" { t.Errorf("unexpected error: %s", err) } @@ -4019,6 +4019,48 @@ func TestConnect_schema_update(t *testing.T) { } } +func TestConnect_context_cancel(t *testing.T) { + var connLongReconnectOpts = Opts{ + Timeout: 5 * time.Second, + User: "test", + Pass: "test", + Reconnect: time.Second, + MaxReconnects: 100, + } + + ctx, cancel := context.WithCancel(context.Background()) + + var conn *Connection + var err error + + go func() { + conn, err = Connect(ctx, "err", connLongReconnectOpts) + }() + + time.Sleep(5 * time.Second) + + if err != nil && conn != nil { + t.Fatalf("Connect ended before cancel and reconnects") + } + cancel() + + retErr := test_helpers.Retry(func(interface{}) error { + if err == nil { + return fmt.Errorf("didn't get the error from Connect") + } + return nil + }, nil, 10, time.Second) + + if conn != nil { + t.Fatalf("ConnectionPool was created after cancel") + } + if clErr, ok := err.(ClientError); retErr != nil || !ok || + (clErr.Code != ErrConnectionClosed && clErr.Msg != "context is canceled") { + t.Fatalf("Wrong error, exected %s, got %v", + "no active connections", err) + } +} + // runTestMain is a body of TestMain function // (see https://pkg.go.dev/testing#hdr-Main). // Using defer + os.Exit is not works so TestMain body diff --git a/test_helpers/main.go b/test_helpers/main.go index 894ebb653..90145ef46 100644 --- a/test_helpers/main.go +++ b/test_helpers/main.go @@ -11,6 +11,7 @@ package test_helpers import ( + "context" "errors" "fmt" "io" @@ -97,7 +98,7 @@ func isReady(server string, opts *tarantool.Opts) error { var conn *tarantool.Connection var resp *tarantool.Response - conn, err = tarantool.Connect(server, *opts) + conn, err = tarantool.Connect(context.Background(), server, *opts) if err != nil { return err } diff --git a/test_helpers/pool_helper.go b/test_helpers/pool_helper.go index c44df2f6a..7028512b8 100644 --- a/test_helpers/pool_helper.go +++ b/test_helpers/pool_helper.go @@ -1,6 +1,7 @@ package test_helpers import ( + "context" "fmt" "reflect" "time" @@ -132,7 +133,7 @@ func Retry(f func(interface{}) error, args interface{}, count int, timeout time. func InsertOnInstance(server string, connOpts tarantool.Opts, space interface{}, tuple interface{}) error { - conn, err := tarantool.Connect(server, connOpts) + conn, err := tarantool.Connect(context.Background(), server, connOpts) if err != nil { return fmt.Errorf("fail to connect to %s: %s", server, err.Error()) } @@ -192,7 +193,7 @@ func InsertOnInstances(servers []string, connOpts tarantool.Opts, space interfac } func SetInstanceRO(server string, connOpts tarantool.Opts, isReplica bool) error { - conn, err := tarantool.Connect(server, connOpts) + conn, err := tarantool.Connect(context.Background(), server, connOpts) if err != nil { return err } diff --git a/test_helpers/utils.go b/test_helpers/utils.go index 3771a5f9e..8967da4a0 100644 --- a/test_helpers/utils.go +++ b/test_helpers/utils.go @@ -1,6 +1,7 @@ package test_helpers import ( + "context" "fmt" "testing" "time" @@ -17,7 +18,7 @@ func ConnectWithValidation(t testing.TB, opts tarantool.Opts) *tarantool.Connection { t.Helper() - conn, err := tarantool.Connect(server, opts) + conn, err := tarantool.Connect(context.Background(), server, opts) if err != nil { t.Fatalf("Failed to connect: %s", err.Error()) } diff --git a/uuid/example_test.go b/uuid/example_test.go index 632f620be..ed5b68ae8 100644 --- a/uuid/example_test.go +++ b/uuid/example_test.go @@ -9,6 +9,7 @@ package uuid_test import ( + "context" "fmt" "log" @@ -25,7 +26,8 @@ func Example() { User: "test", Pass: "test", } - client, err := tarantool.Connect("127.0.0.1:3013", opts) + ctx := context.Background() + client, err := tarantool.Connect(ctx, "127.0.0.1:3013", opts) if err != nil { log.Fatalf("Failed to connect: %s", err.Error()) }