Skip to content

Commit

Permalink
Merge pull request apache#1280 from kiwicom/ms/gocql-all-dials-observ…
Browse files Browse the repository at this point in the history
…able

Make sure all connections are observed
  • Loading branch information
alourie authored Mar 25, 2019
2 parents 6bdac5e + 0b5041c commit fc3925a
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 17 deletions.
30 changes: 28 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,34 @@ type Conn struct {
timeouts int64
}

// Connect establishes a connection to a Cassandra node.
func (s *Session) dial(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) {
// connect establishes a connection to a Cassandra node using session's connection config.
func (s *Session) connect(host *HostInfo, errorHandler ConnErrorHandler) (*Conn, error) {
return s.dial(host, s.connCfg, errorHandler)
}

// dial establishes a connection to a Cassandra node and notifies the session's connectObserver.
func (s *Session) dial(host *HostInfo, connConfig *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) {
var obs ObservedConnect
if s.connectObserver != nil {
obs.Host = host
obs.Start = time.Now()
}

conn, err := s.dialWithoutObserver(host, connConfig, errorHandler)

if s.connectObserver != nil {
obs.End = time.Now()
obs.Err = err
s.connectObserver.ObserveConnect(obs)
}

return conn, err
}

// dialWithoutObserver establishes connection to a Cassandra node.
//
// dialWithoutObserver does not notify the connection observer, so you most probably want to call dial() instead.
func (s *Session) dialWithoutObserver(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) {
ip := host.ConnectAddress()
port := host.port

Expand Down
15 changes: 0 additions & 15 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,21 +656,6 @@ func (s *Session) MapExecuteBatchCAS(batch *Batch, dest map[string]interface{})
return applied, iter, iter.err
}

func (s *Session) connect(host *HostInfo, errorHandler ConnErrorHandler) (*Conn, error) {
if s.connectObserver != nil {
obs := ObservedConnect{
Host: host,
Start: time.Now(),
}
conn, err := s.dial(host, s.connCfg, errorHandler)
obs.End = time.Now()
obs.Err = err
s.connectObserver.ObserveConnect(obs)
return conn, err
}
return s.dial(host, s.connCfg, errorHandler)
}

type hostMetrics struct {
Attempts int
TotalLatency int64
Expand Down

0 comments on commit fc3925a

Please sign in to comment.