Skip to content

Commit

Permalink
Added ConnectTimeout for initial connections to nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed Feb 9, 2017
1 parent 4d2d1ac commit baa79dc
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 26 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,4 @@ Nathan Davies <nathanjamesdavies@gmail.com>
Bo Blanton <bo.blanton@gmail.com>
Vincent Rischmann <me@vrischmann.me>
Jesse Claven <jesse.claven@gmail.com>
Derrick Wippler <thrawn01@gmail.com>
2 changes: 2 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type ClusterConfig struct {
// versions the protocol selected is not defined (ie, it can be any of the supported in the cluster)
ProtoVersion int
Timeout time.Duration // connection timeout (default: 600ms)
ConnectTimeout time.Duration // initial connection timeout, used during initial dial to server (default: 600ms)
Port int // port (default: 9042)
Keyspace string // initial keyspace (optional)
NumConns int // number of connections per host (default: 2)
Expand Down Expand Up @@ -132,6 +133,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
Hosts: hosts,
CQLVersion: "3.0.0",
Timeout: 600 * time.Millisecond,
ConnectTimeout: 600 * time.Millisecond,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
Expand Down
21 changes: 11 additions & 10 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,14 @@ type SslOptions struct {
}

type ConnConfig struct {
ProtoVersion int
CQLVersion string
Timeout time.Duration
Compressor Compressor
Authenticator Authenticator
Keepalive time.Duration
tlsConfig *tls.Config
ProtoVersion int
CQLVersion string
Timeout time.Duration
ConnectTimeout time.Duration
Compressor Compressor
Authenticator Authenticator
Keepalive time.Duration
tlsConfig *tls.Config
}

type ConnErrorHandler interface {
Expand Down Expand Up @@ -167,7 +168,7 @@ func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, ses
)

dialer := &net.Dialer{
Timeout: cfg.Timeout,
Timeout: cfg.ConnectTimeout,
}

// TODO(zariel): handle ipv6 zone
Expand Down Expand Up @@ -212,8 +213,8 @@ func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, ses
ctx context.Context
cancel func()
)
if c.timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), c.timeout)
if cfg.ConnectTimeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), cfg.ConnectTimeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
Expand Down
65 changes: 59 additions & 6 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,50 @@ func newTestSession(addr string, proto protoVersion) (*Session, error) {
return testCluster(addr, proto).CreateSession()
}

func TestStartupTimeout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
log := &testLogger{}
Logger = log
defer func() {
Logger = &defaultLogger{}
}()

srv := NewTestServer(t, defaultProto, ctx)
defer srv.Stop()

// Tell the server to never respond to Startup frame
atomic.StoreInt32(&srv.TimeoutOnStartup, 1)

startTime := time.Now()
cluster := NewCluster(srv.Address)
cluster.ProtoVersion = int(defaultProto)
cluster.disableControlConn = true
// Set very long query connection timeout
// so we know CreateSession() is using the ConnectTimeout
cluster.Timeout = time.Second * 5

// Create session should timeout during connect attempt
_, err := cluster.CreateSession()
if err == nil {
t.Fatal("CreateSession() should have returned a timeout error")
}

elapsed := time.Since(startTime)
if elapsed > time.Second*5 {
t.Fatal("ConnectTimeout is not respected")
}

if !strings.Contains(err.Error(), "no connections were made when creating the session") {
t.Fatalf("Expected to receive no connections error - got '%s'", err)
}

if !strings.Contains(log.String(), "no response to connection startup within timeout") {
t.Fatalf("Expected to receive timeout log message - got '%s'", log.String())
}

cancel()
}

func TestTimeout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

Expand Down Expand Up @@ -619,12 +663,13 @@ func NewSSLTestServer(t testing.TB, protocol uint8, ctx context.Context) *TestSe
}

type TestServer struct {
Address string
t testing.TB
nreq uint64
listen net.Listener
nKillReq int64
compressor Compressor
Address string
TimeoutOnStartup int32
t testing.TB
nreq uint64
listen net.Listener
nKillReq int64
compressor Compressor

protocol byte
headerSize int
Expand Down Expand Up @@ -738,6 +783,14 @@ func (srv *TestServer) process(f *framer) {

switch head.op {
case opStartup:
if atomic.LoadInt32(&srv.TimeoutOnStartup) > 0 {
// Do not respond to startup command
// wait until we get a cancel signal
select {
case <-srv.ctx.Done():
return
}
}
f.writeHeader(0, opReady, head.stream)
case opOptions:
f.writeHeader(0, opSupported, head.stream)
Expand Down
19 changes: 10 additions & 9 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@ func connConfig(cfg *ClusterConfig) (*ConnConfig, error) {
}

return &ConnConfig{
ProtoVersion: cfg.ProtoVersion,
CQLVersion: cfg.CQLVersion,
Timeout: cfg.Timeout,
Compressor: cfg.Compressor,
Authenticator: cfg.Authenticator,
Keepalive: cfg.SocketKeepalive,
tlsConfig: tlsConfig,
ProtoVersion: cfg.ProtoVersion,
CQLVersion: cfg.CQLVersion,
Timeout: cfg.Timeout,
ConnectTimeout: cfg.ConnectTimeout,
Compressor: cfg.Compressor,
Authenticator: cfg.Authenticator,
Keepalive: cfg.SocketKeepalive,
tlsConfig: tlsConfig,
}, nil
}

Expand Down Expand Up @@ -395,8 +396,8 @@ func (pool *hostConnPool) fill() {
// probably unreachable host
pool.fillingStopped(true)

// this is calle with the connetion pool mutex held, this call will
// then recursivly try to lock it again. FIXME
// this is call with the connection pool mutex held, this call will
// then recursively try to lock it again. FIXME
go pool.session.handleNodeDown(pool.host.Peer(), pool.port)
return
}
Expand Down
15 changes: 14 additions & 1 deletion logger.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
package gocql

import "log"
import (
"bytes"
"fmt"
"log"
)

type StdLogger interface {
Print(v ...interface{})
Printf(format string, v ...interface{})
Println(v ...interface{})
}

type testLogger struct {
capture bytes.Buffer
}

func (l *testLogger) Print(v ...interface{}) { fmt.Fprint(&l.capture, v...) }
func (l *testLogger) Printf(format string, v ...interface{}) { fmt.Fprintf(&l.capture, format, v...) }
func (l *testLogger) Println(v ...interface{}) { fmt.Fprintln(&l.capture, v...) }
func (l *testLogger) String() string { return l.capture.String() }

type defaultLogger struct{}

func (l *defaultLogger) Print(v ...interface{}) { log.Print(v...) }
Expand Down

0 comments on commit baa79dc

Please sign in to comment.