Skip to content

Commit

Permalink
Add reconnection policies (apache#1077)
Browse files Browse the repository at this point in the history
* test commit

* stub

* add reconnection policy

* add debug message

* add conviction policy interface and tests

* changes for a new branch

* fix code comment and default setting

* fix broken test

* change default setting

* remove conviction policy (now in apache#1081), add myself in AUTHORS

* fix code comment

* fix variable naming for ConstantReconnectionPolicy
  • Loading branch information
changliuau authored and Zariel committed Mar 20, 2018
1 parent 2c8c099 commit 12e3a8c
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 28 deletions.
3 changes: 2 additions & 1 deletion AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,5 @@ Sascha Steinbiss <satta@debian.org>
Seth Rosenblum <seth.t.rosenblum@gmail.com>
Javier Zunzunegui <javier.zunzunegui.b@gmail.com>
Luke Hines <lukehines@protonmail.com>
Zhixin Wen <john.wenzhixin@hotmail.com>
Zhixin Wen <john.wenzhixin@hotmail.com>
Chang Liu <changliu.it@gmail.com>
36 changes: 19 additions & 17 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,24 @@ type ClusterConfig struct {
// If it is 0 or unset (the default) then the driver will attempt to discover the
// highest supported protocol for the cluster. In clusters with nodes of different
// versions the protocol selected is not defined (ie, it can be any of the supported in the cluster)
ProtoVersion int
Timeout time.Duration // connection timeout (default: 600ms)
ConnectTimeout time.Duration // initial connection timeout, used during initial dial to server (default: 600ms)
Port int // port (default: 9042)
Keyspace string // initial keyspace (optional)
NumConns int // number of connections per host (default: 2)
Consistency Consistency // default consistency level (default: Quorum)
Compressor Compressor // compression algorithm (default: nil)
Authenticator Authenticator // authenticator (default: nil)
RetryPolicy RetryPolicy // Default retry policy to use for queries (default: 0)
SocketKeepalive time.Duration // The keepalive period to use, enabled if > 0 (default: 0)
MaxPreparedStmts int // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
MaxRoutingKeyInfo int // Sets the maximum cache size for query info about statements for each session (default: 1000)
PageSize int // Default page size to use for created sessions (default: 5000)
SerialConsistency SerialConsistency // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset)
SslOpts *SslOptions
DefaultTimestamp bool // Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server. (default: true, only enabled for protocol 3 and above)
ProtoVersion int
Timeout time.Duration // connection timeout (default: 600ms)
ConnectTimeout time.Duration // initial connection timeout, used during initial dial to server (default: 600ms)
Port int // port (default: 9042)
Keyspace string // initial keyspace (optional)
NumConns int // number of connections per host (default: 2)
Consistency Consistency // default consistency level (default: Quorum)
Compressor Compressor // compression algorithm (default: nil)
Authenticator Authenticator // authenticator (default: nil)
RetryPolicy RetryPolicy // Default retry policy to use for queries (default: 0)
ReconnectionPolicy ReconnectionPolicy // Default reconnection policy to use for reconnecting before trying to mark host as down (default: see below)
SocketKeepalive time.Duration // The keepalive period to use, enabled if > 0 (default: 0)
MaxPreparedStmts int // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
MaxRoutingKeyInfo int // Sets the maximum cache size for query info about statements for each session (default: 1000)
PageSize int // Default page size to use for created sessions (default: 5000)
SerialConsistency SerialConsistency // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset)
SslOpts *SslOptions
DefaultTimestamp bool // Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server. (default: true, only enabled for protocol 3 and above)
// PoolConfig configures the underlying connection pool, allowing the
// configuration of host selection and connection selection policies.
PoolConfig PoolConfig
Expand Down Expand Up @@ -151,6 +152,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
DefaultTimestamp: true,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
}
return cfg
}
Expand Down
3 changes: 3 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"testing"
"time"
"reflect"
)

func TestNewCluster_Defaults(t *testing.T) {
Expand All @@ -19,6 +20,8 @@ func TestNewCluster_Defaults(t *testing.T) {
assertEqual(t, "cluster config default timestamp", true, cfg.DefaultTimestamp)
assertEqual(t, "cluster config max wait schema agreement", 60*time.Second, cfg.MaxWaitSchemaAgreement)
assertEqual(t, "cluster config reconnect interval", 60*time.Second, cfg.ReconnectInterval)
assertTrue(t, "cluster config reconnection policy",
reflect.DeepEqual(&ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second}, cfg.ReconnectionPolicy))
}

func TestNewCluster_WithHosts(t *testing.T) {
Expand Down
9 changes: 7 additions & 2 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,10 @@ func (pool *hostConnPool) connectMany(count int) error {
func (pool *hostConnPool) connect() (err error) {
// TODO: provide a more robust connection retry mechanism, we should also
// be able to detect hosts that come up by trying to connect to downed ones.
const maxAttempts = 3
// try to connect
var conn *Conn
for i := 0; i < maxAttempts; i++ {
reconnectionPolicy := pool.session.cfg.ReconnectionPolicy
for i := 0; i < reconnectionPolicy.GetMaxRetries(); i++ {
conn, err = pool.session.connect(pool.host, pool)
if err == nil {
break
Expand All @@ -512,6 +512,11 @@ func (pool *hostConnPool) connect() (err error) {
break
}
}
if gocqlDebug {
Logger.Printf("connection failed %q: %v, reconnecting with %T\n",
pool.host.ConnectAddress(), err, reconnectionPolicy)
}
time.Sleep(reconnectionPolicy.GetInterval(i))
}

if err != nil {
Expand Down
63 changes: 55 additions & 8 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,23 +176,28 @@ func (e *ExponentialBackoffRetryPolicy) Attempt(q RetryableQuery) bool {
return true
}

func (e *ExponentialBackoffRetryPolicy) napTime(attempts int) time.Duration {
if e.Min <= 0 {
e.Min = 100 * time.Millisecond
// used to calculate exponentially growing time
func getExponentialTime(min time.Duration, max time.Duration, attempts int) time.Duration {
if min <= 0 {
min = 100 * time.Millisecond
}
if e.Max <= 0 {
e.Max = 10 * time.Second
if max <= 0 {
max = 10 * time.Second
}
minFloat := float64(e.Min)
minFloat := float64(min)
napDuration := minFloat * math.Pow(2, float64(attempts-1))
// add some jitter
napDuration += rand.Float64()*minFloat - (minFloat / 2)
if napDuration > float64(e.Max) {
return time.Duration(e.Max)
if napDuration > float64(max) {
return time.Duration(max)
}
return time.Duration(napDuration)
}

func (e *ExponentialBackoffRetryPolicy) napTime(attempts int) time.Duration {
return getExponentialTime(e.Min, e.Max, attempts)
}

type HostStateNotifier interface {
AddHost(host *HostInfo)
RemoveHost(host *HostInfo)
Expand Down Expand Up @@ -706,3 +711,45 @@ func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost {
return (*selectedHost)(host)
}
}

// ReconnectionPolicy interface is used by gocql to determine if reconnection
// can be attempted after connection error. The interface allows gocql users
// to implement their own logic to determine how to attempt reconnection.
//
type ReconnectionPolicy interface {
GetInterval(currentRetry int) time.Duration
GetMaxRetries() int
}

// ConstantReconnectionPolicy has simple logic for returning a fixed reconnection interval.
//
// Examples of usage:
//
// cluster.ReconnectionPolicy = &gocql.ConstantReconnectionPolicy{MaxRetries: 10, Interval: 8 * time.Second}
//
type ConstantReconnectionPolicy struct {
MaxRetries int
Interval time.Duration
}

func (c *ConstantReconnectionPolicy) GetInterval(currentRetry int) time.Duration {
return c.Interval
}

func (c *ConstantReconnectionPolicy) GetMaxRetries() int {
return c.MaxRetries
}

// ExponentialReconnectionPolicy returns a growing reconnection interval.
type ExponentialReconnectionPolicy struct {
MaxRetries int
InitialInterval time.Duration
}

func (e *ExponentialReconnectionPolicy) GetInterval(currentRetry int) time.Duration {
return getExponentialTime(e.InitialInterval, math.MaxInt16*time.Second, e.GetMaxRetries())
}

func (e *ExponentialReconnectionPolicy) GetMaxRetries() int {
return e.MaxRetries
}

0 comments on commit 12e3a8c

Please sign in to comment.