Skip to content

Commit

Permalink
scylla: provide scyllaConnPicker as a Dialer, when applicable
Browse files Browse the repository at this point in the history
This commit actually causes the scyllaConnPicker to be used as a dialer
for establishing new connections.
  • Loading branch information
piodul authored and mmatczuk committed Nov 12, 2020
1 parent 4ca0628 commit 1690bbe
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
13 changes: 10 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,23 @@ func (s *Session) connect(ctx context.Context, host *HostInfo, errorHandler Conn
return s.dial(ctx, host, s.connCfg, errorHandler)
}

func (s *Session) connectWithDialer(ctx context.Context, host *HostInfo, errorHandler ConnErrorHandler, dialer Dialer) (*Conn, error) {
return s.dialWithDialer(ctx, host, s.connCfg, errorHandler, dialer)
}

// dial establishes a connection to a Cassandra node and notifies the session's connectObserver.
func (s *Session) dial(ctx context.Context, host *HostInfo, connConfig *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) {
return s.dialWithDialer(ctx, host, connConfig, errorHandler, connConfig.Dialer)
}

func (s *Session) dialWithDialer(ctx context.Context, host *HostInfo, connConfig *ConnConfig, errorHandler ConnErrorHandler, dialer Dialer) (*Conn, error) {
var obs ObservedConnect
if s.connectObserver != nil {
obs.Host = host
obs.Start = time.Now()
}

conn, err := s.dialWithoutObserver(ctx, host, connConfig, errorHandler)
conn, err := s.dialWithoutObserver(ctx, host, connConfig, errorHandler, dialer)

if s.connectObserver != nil {
obs.End = time.Now()
Expand All @@ -194,7 +202,7 @@ func (s *Session) dial(ctx context.Context, host *HostInfo, connConfig *ConnConf
// dialWithoutObserver establishes connection to a Cassandra node.
//
// dialWithoutObserver does not notify the connection observer, so you most probably want to call dial() instead.
func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) {
func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, dialer Dialer) (*Conn, error) {
ip := host.ConnectAddress()
port := host.port

Expand All @@ -205,7 +213,6 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *
panic(fmt.Sprintf("host missing port: %v", port))
}

dialer := cfg.Dialer
if dialer == nil {
d := &net.Dialer{
Timeout: cfg.ConnectTimeout,
Expand Down
7 changes: 6 additions & 1 deletion connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,13 +463,18 @@ func (pool *hostConnPool) connectMany(count int) error {

// create a new connection to the host and add it to the pool
func (pool *hostConnPool) connect() (err error) {
dialer, ok := pool.connPicker.(Dialer)
if !ok {
dialer = pool.session.cfg.Dialer
}

// TODO: provide a more robust connection retry mechanism, we should also
// be able to detect hosts that come up by trying to connect to downed ones.
// try to connect
var conn *Conn
reconnectionPolicy := pool.session.cfg.ReconnectionPolicy
for i := 0; i < reconnectionPolicy.GetMaxRetries(); i++ {
conn, err = pool.session.connect(pool.session.ctx, pool.host, pool)
conn, err = pool.session.connectWithDialer(pool.session.ctx, pool.host, pool, dialer)
if err == nil {
break
}
Expand Down

0 comments on commit 1690bbe

Please sign in to comment.