Skip to content

Commit

Permalink
new api draft
Browse files Browse the repository at this point in the history
  • Loading branch information
askalt committed Nov 8, 2023
1 parent c564e6d commit ba56a18
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 352 deletions.
125 changes: 15 additions & 110 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
reconnects := v[0].(uint)
err := v[1].(error)
log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
reconnects, conn.opts.MaxReconnects, conn.addr, err)
reconnects, conn.opts.MaxReconnects, conn.Addr(), err)
case LogLastReconnectFailed:
err := v[0].(error)
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
conn.addr, err)
conn.Addr(), err)
case LogUnexpectedResultId:
resp := v[0].(*Response)
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response",
conn.addr, resp.RequestId)
conn.Addr(), resp.RequestId)
case LogWatchEventReadFailed:
err := v[0].(error)
log.Printf("tarantool: unable to parse watch event: %s", err)
Expand Down Expand Up @@ -156,10 +156,10 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
// More on graceful shutdown:
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
type Connection struct {
addr string
c Conn
mutex sync.Mutex
cond *sync.Cond
dialer Dialer
c Conn
mutex sync.Mutex
cond *sync.Cond
// Schema contains schema loaded on connection.
Schema *Schema
// requestId contains the last request ID for requests with nil context.
Expand Down Expand Up @@ -260,11 +260,6 @@ const (

// Opts is a way to configure Connection
type Opts struct {
// Auth is an authentication method.
Auth Auth
// Dialer is a Dialer object used to create a new connection to a
// Tarantool instance. TtDialer is a default one.
Dialer Dialer
// Timeout for response to a particular request. The timeout is reset when
// push messages are received. If Timeout is zero, any request can be
// blocked infinitely.
Expand All @@ -287,10 +282,6 @@ type Opts struct {
// endlessly.
// After MaxReconnects attempts Connection becomes closed.
MaxReconnects uint
// Username for logging in to Tarantool.
User string
// User password for logging in to Tarantool.
Pass string
// RateLimit limits number of 'in-fly' request, i.e. already put into
// requests queue, but not yet answered by server or timeouted.
// It is disabled by default.
Expand All @@ -315,83 +306,23 @@ type Opts struct {
Handle interface{}
// Logger is user specified logger used for error messages.
Logger Logger
// Transport is the connection type, by default the connection is unencrypted.
Transport string
// SslOpts is used only if the Transport == 'ssl' is set.
Ssl SslOpts
// RequiredProtocolInfo contains minimal protocol version and
// list of protocol features that should be supported by
// Tarantool server. By default there are no restrictions.
RequiredProtocolInfo ProtocolInfo
}

// SslOpts is a way to configure ssl transport.
type SslOpts struct {
// KeyFile is a path to a private SSL key file.
KeyFile string
// CertFile is a path to an SSL certificate file.
CertFile string
// CaFile is a path to a trusted certificate authorities (CA) file.
CaFile string
// Ciphers is a colon-separated (:) list of SSL cipher suites the connection
// can use.
//
// We don't provide a list of supported ciphers. This is what OpenSSL
// does. The only limitation is usage of TLSv1.2 (because other protocol
// versions don't seem to support the GOST cipher). To add additional
// ciphers (GOST cipher), you must configure OpenSSL.
//
// See also
//
// * https://www.openssl.org/docs/man1.1.1/man1/ciphers.html
Ciphers string
// Password is a password for decrypting the private SSL key file.
// The priority is as follows: try to decrypt with Password, then
// try PasswordFile.
Password string
// PasswordFile is a path to the list of passwords for decrypting
// the private SSL key file. The connection tries every line from the
// file as a password.
PasswordFile string
}

// Clone returns a copy of the Opts object.
// Any changes in copy RequiredProtocolInfo will not affect the original
// RequiredProtocolInfo value.
func (opts Opts) Clone() Opts {
optsCopy := opts
optsCopy.RequiredProtocolInfo = opts.RequiredProtocolInfo.Clone()

return optsCopy
}

// Connect creates and configures a new Connection.
//
// Address could be specified in following ways:
//
// - TCP connections (tcp://192.168.1.1:3013, tcp://my.host:3013,
// tcp:192.168.1.1:3013, tcp:my.host:3013, 192.168.1.1:3013, my.host:3013)
//
// - Unix socket, first '/' or '.' indicates Unix socket
// (unix:///abs/path/tnt.sock, unix:path/tnt.sock, /abs/path/tnt.sock,
// ./rel/path/tnt.sock, unix/:path/tnt.sock)
func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err error) {
func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, err error) {
conn = &Connection{
addr: addr,
dialer: dialer,
requestId: 0,
contextRequestId: 1,
Greeting: &Greeting{},
control: make(chan struct{}),
opts: opts.Clone(),
opts: opts,
dec: msgpack.NewDecoder(&smallBuf{}),
}
maxprocs := uint32(runtime.GOMAXPROCS(-1))
if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
conn.opts.Concurrency = maxprocs * 4
}
if conn.opts.Dialer == nil {
conn.opts.Dialer = TtDialer{}
}
if c := conn.opts.Concurrency; c&(c-1) != 0 {
for i := uint(1); i < 32; i *= 2 {
c |= c >> i
Expand Down Expand Up @@ -473,27 +404,7 @@ func (conn *Connection) CloseGraceful() error {

// Addr returns a configured address of Tarantool socket.
func (conn *Connection) Addr() string {
return conn.addr
}

// RemoteAddr returns an address of Tarantool socket.
func (conn *Connection) RemoteAddr() string {
conn.mutex.Lock()
defer conn.mutex.Unlock()
if conn.c == nil {
return ""
}
return conn.c.RemoteAddr().String()
}

// LocalAddr returns an address of outgoing socket.
func (conn *Connection) LocalAddr() string {
conn.mutex.Lock()
defer conn.mutex.Unlock()
if conn.c == nil {
return ""
}
return conn.c.LocalAddr().String()
return conn.c.GetAddr()
}

// Handle returns a user-specified handle from Opts.
Expand All @@ -512,14 +423,8 @@ func (conn *Connection) dial(ctx context.Context) error {
opts := conn.opts

var c Conn
c, err := conn.opts.Dialer.Dial(ctx, conn.addr, DialOpts{
IoTimeout: opts.Timeout,
Transport: opts.Transport,
Ssl: opts.Ssl,
RequiredProtocol: opts.RequiredProtocolInfo,
Auth: opts.Auth,
User: opts.User,
Password: opts.Pass,
c, err := conn.dialer.Dial(ctx, DialOpts{
IoTimeout: opts.Timeout,
})
if err != nil {
return err
Expand Down Expand Up @@ -1474,7 +1379,7 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
// That's why we can't just check the Tarantool response for an unsupported
// request error.
if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
conn.opts.RequiredProtocolInfo.Features) {
conn.c.ProtocolInfo().Features) {
err := fmt.Errorf("the feature %s must be required by connection "+
"options to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
return nil, err
Expand Down Expand Up @@ -1580,7 +1485,7 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
// Since 1.10.0
func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
info := clientProtocolInfo.Clone()
info.Auth = conn.opts.Auth
info.Auth = conn.serverProtocolInfo.Auth
return info
}

Expand Down
Loading

0 comments on commit ba56a18

Please sign in to comment.