Skip to content

Commit

Permalink
new api draft
Browse files Browse the repository at this point in the history
  • Loading branch information
askalt committed Nov 7, 2023
1 parent c564e6d commit 4a6c7a5
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 118 deletions.
59 changes: 12 additions & 47 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
reconnects := v[0].(uint)
err := v[1].(error)
log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
reconnects, conn.opts.MaxReconnects, conn.addr, err)
reconnects, conn.opts.MaxReconnects, conn.c.RemoteAddr(), err)
case LogLastReconnectFailed:
err := v[0].(error)
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
conn.addr, err)
conn.c.RemoteAddr(), err)
case LogUnexpectedResultId:
resp := v[0].(*Response)
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response",
conn.addr, resp.RequestId)
conn.c.RemoteAddr(), resp.RequestId)
case LogWatchEventReadFailed:
err := v[0].(error)
log.Printf("tarantool: unable to parse watch event: %s", err)
Expand Down Expand Up @@ -156,10 +156,10 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
// More on graceful shutdown:
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
type Connection struct {
addr string
c Conn
mutex sync.Mutex
cond *sync.Cond
dialer Dialer
c Conn
mutex sync.Mutex
cond *sync.Cond
// Schema contains schema loaded on connection.
Schema *Schema
// requestId contains the last request ID for requests with nil context.
Expand Down Expand Up @@ -260,11 +260,6 @@ const (

// Opts is a way to configure Connection
type Opts struct {
// Auth is an authentication method.
Auth Auth
// Dialer is a Dialer object used to create a new connection to a
// Tarantool instance. TtDialer is a default one.
Dialer Dialer
// Timeout for response to a particular request. The timeout is reset when
// push messages are received. If Timeout is zero, any request can be
// blocked infinitely.
Expand All @@ -287,10 +282,6 @@ type Opts struct {
// endlessly.
// After MaxReconnects attempts Connection becomes closed.
MaxReconnects uint
// Username for logging in to Tarantool.
User string
// User password for logging in to Tarantool.
Pass string
// RateLimit limits number of 'in-fly' request, i.e. already put into
// requests queue, but not yet answered by server or timeouted.
// It is disabled by default.
Expand All @@ -315,14 +306,6 @@ type Opts struct {
Handle interface{}
// Logger is user specified logger used for error messages.
Logger Logger
// Transport is the connection type, by default the connection is unencrypted.
Transport string
// SslOpts is used only if the Transport == 'ssl' is set.
Ssl SslOpts
// RequiredProtocolInfo contains minimal protocol version and
// list of protocol features that should be supported by
// Tarantool server. By default there are no restrictions.
RequiredProtocolInfo ProtocolInfo
}

// SslOpts is a way to configure ssl transport.
Expand Down Expand Up @@ -360,8 +343,6 @@ type SslOpts struct {
// RequiredProtocolInfo value.
func (opts Opts) Clone() Opts {
optsCopy := opts
optsCopy.RequiredProtocolInfo = opts.RequiredProtocolInfo.Clone()

return optsCopy
}

Expand All @@ -375,9 +356,9 @@ func (opts Opts) Clone() Opts {
// - Unix socket, first '/' or '.' indicates Unix socket
// (unix:///abs/path/tnt.sock, unix:path/tnt.sock, /abs/path/tnt.sock,
// ./rel/path/tnt.sock, unix/:path/tnt.sock)
func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err error) {
func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, err error) {
conn = &Connection{
addr: addr,
dialer: dialer,
requestId: 0,
contextRequestId: 1,
Greeting: &Greeting{},
Expand All @@ -389,9 +370,6 @@ func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err
if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
conn.opts.Concurrency = maxprocs * 4
}
if conn.opts.Dialer == nil {
conn.opts.Dialer = TtDialer{}
}
if c := conn.opts.Concurrency; c&(c-1) != 0 {
for i := uint(1); i < 32; i *= 2 {
c |= c >> i
Expand Down Expand Up @@ -471,11 +449,6 @@ func (conn *Connection) CloseGraceful() error {
return conn.shutdown(true)
}

// Addr returns a configured address of Tarantool socket.
func (conn *Connection) Addr() string {
return conn.addr
}

// RemoteAddr returns an address of Tarantool socket.
func (conn *Connection) RemoteAddr() string {
conn.mutex.Lock()
Expand Down Expand Up @@ -512,15 +485,7 @@ func (conn *Connection) dial(ctx context.Context) error {
opts := conn.opts

var c Conn
c, err := conn.opts.Dialer.Dial(ctx, conn.addr, DialOpts{
IoTimeout: opts.Timeout,
Transport: opts.Transport,
Ssl: opts.Ssl,
RequiredProtocol: opts.RequiredProtocolInfo,
Auth: opts.Auth,
User: opts.User,
Password: opts.Pass,
})
c, err := conn.dialer.Dial(ctx, DialOpts{IoTimeout: opts.Timeout})
if err != nil {
return err
}
Expand Down Expand Up @@ -1474,7 +1439,7 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
// That's why we can't just check the Tarantool response for an unsupported
// request error.
if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
conn.opts.RequiredProtocolInfo.Features) {
conn.serverProtocolInfo.Features) {
err := fmt.Errorf("the feature %s must be required by connection "+
"options to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
return nil, err
Expand Down Expand Up @@ -1580,7 +1545,7 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
// Since 1.10.0
func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
info := clientProtocolInfo.Clone()
info.Auth = conn.opts.Auth
info.Auth = conn.serverProtocolInfo.Auth
return info
}

Expand Down
Loading

0 comments on commit 4a6c7a5

Please sign in to comment.