Skip to content

Commit 20fb507

Browse files
committed
[ADDED] Exponential backoff for server reconnects
1 parent 7917595 commit 20fb507

File tree

5 files changed

+107
-35
lines changed

5 files changed

+107
-35
lines changed

nats.go

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ const (
5050
Version = "1.24.0"
5151
DefaultURL = "nats://127.0.0.1:4222"
5252
DefaultPort = 4222
53-
DefaultMaxReconnect = 60
54-
DefaultReconnectWait = 2 * time.Second
53+
DefaultMaxReconnect = -1
5554
DefaultReconnectJitter = 100 * time.Millisecond
5655
DefaultReconnectJitterTLS = time.Second
5756
DefaultTimeout = 2 * time.Second
@@ -62,6 +61,10 @@ const (
6261
RequestChanLen = 8
6362
DefaultDrainTimeout = 30 * time.Second
6463
LangString = "go"
64+
65+
// DEPRECATED: Client now uses [nats.DefaultReconnectBackoffHandler] to
66+
// handle default reconnect wait time.
67+
DefaultReconnectWait = 2 * time.Second
6568
)
6669

6770
const (
@@ -143,17 +146,17 @@ var (
143146
// GetDefaultOptions returns default configuration options for the client.
144147
func GetDefaultOptions() Options {
145148
return Options{
146-
AllowReconnect: true,
147-
MaxReconnect: DefaultMaxReconnect,
148-
ReconnectWait: DefaultReconnectWait,
149-
ReconnectJitter: DefaultReconnectJitter,
150-
ReconnectJitterTLS: DefaultReconnectJitterTLS,
151-
Timeout: DefaultTimeout,
152-
PingInterval: DefaultPingInterval,
153-
MaxPingsOut: DefaultMaxPingOut,
154-
SubChanLen: DefaultMaxChanLen,
155-
ReconnectBufSize: DefaultReconnectBufSize,
156-
DrainTimeout: DefaultDrainTimeout,
149+
AllowReconnect: true,
150+
MaxReconnect: DefaultMaxReconnect,
151+
ReconnectJitter: DefaultReconnectJitter,
152+
ReconnectJitterTLS: DefaultReconnectJitterTLS,
153+
Timeout: DefaultTimeout,
154+
PingInterval: DefaultPingInterval,
155+
MaxPingsOut: DefaultMaxPingOut,
156+
SubChanLen: DefaultMaxChanLen,
157+
ReconnectBufSize: DefaultReconnectBufSize,
158+
DrainTimeout: DefaultDrainTimeout,
159+
IgnoreAuthErrorAbort: true,
157160
}
158161
}
159162

@@ -470,6 +473,7 @@ type Options struct {
470473

471474
// IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting
472475
// subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy).
476+
// DEPRECATED: This option will be removed in future releases.
473477
IgnoreAuthErrorAbort bool
474478

475479
// SkipHostLookup skips the DNS lookup for the server hostname.
@@ -1260,13 +1264,22 @@ func CustomInboxPrefix(p string) Option {
12601264

12611265
// IgnoreAuthErrorAbort opts out of the default connect behavior of aborting
12621266
// subsequent reconnect attempts if server returns the same auth error twice.
1267+
// DEPRECATED: This option is now set to 'true' by default, therefore this option will be removed in future releases.
12631268
func IgnoreAuthErrorAbort() Option {
12641269
return func(o *Options) error {
12651270
o.IgnoreAuthErrorAbort = true
12661271
return nil
12671272
}
12681273
}
12691274

1275+
// AbortOnAuthErrors causes the client to bail out after 2 subsequent auth connection errors.
1276+
func AbortOnAuthErrors() Option {
1277+
return func(o *Options) error {
1278+
o.IgnoreAuthErrorAbort = false
1279+
return nil
1280+
}
1281+
}
1282+
12701283
// SkipHostLookup is an Option to skip the host lookup when connecting to a server.
12711284
func SkipHostLookup() Option {
12721285
return func(o *Options) error {
@@ -2559,6 +2572,28 @@ func (nc *Conn) stopPingTimer() {
25592572
}
25602573
}
25612574

2575+
// DefaultReconnectBackoffHandler returns a default reconnect exponential backoff interval.
2576+
// Base reconnect wait is 10ms, with x2 multiplier. Max wait time is 2 minutes.
2577+
// 10ms, 20ms, 40ms, 80ms...2m
2578+
// A random jitter is added to the result.
2579+
func DefaultReconnectBackoffHandler(jitter time.Duration) ReconnectDelayHandler {
2580+
return func(attempts int) time.Duration {
2581+
// base interval is 10ms
2582+
backoff := 10 * time.Millisecond
2583+
for i := 0; i < attempts-1; i++ {
2584+
backoff *= 2
2585+
if backoff > 2*time.Minute {
2586+
backoff = 2 * time.Minute
2587+
break
2588+
}
2589+
}
2590+
if jitter > 0 {
2591+
jitter = time.Duration(rand.Int63n(int64(jitter)))
2592+
}
2593+
return backoff + jitter
2594+
}
2595+
}
2596+
25622597
// Try to reconnect using the option parameters.
25632598
// This function assumes we are allowed to reconnect.
25642599
func (nc *Conn) doReconnect(err error) {
@@ -2596,18 +2631,19 @@ func (nc *Conn) doReconnect(err error) {
25962631
var wlf int
25972632

25982633
var jitter time.Duration
2599-
var rw time.Duration
26002634
// If a custom reconnect delay handler is set, this takes precedence.
26012635
crd := nc.Opts.CustomReconnectDelayCB
2602-
if crd == nil {
2603-
rw = nc.Opts.ReconnectWait
2636+
rw := nc.Opts.ReconnectWait
2637+
if crd == nil && rw == 0 {
26042638
// TODO: since we sleep only after the whole list has been tried, we can't
26052639
// rely on individual *srv to know if it is a TLS or non-TLS url.
26062640
// We have to pick which type of jitter to use, for now, we use these hints:
26072641
jitter = nc.Opts.ReconnectJitter
26082642
if nc.Opts.Secure || nc.Opts.TLSConfig != nil {
26092643
jitter = nc.Opts.ReconnectJitterTLS
26102644
}
2645+
2646+
crd = DefaultReconnectBackoffHandler(jitter)
26112647
}
26122648

26132649
for i := 0; len(nc.srvPool) > 0; {

nats_test.go

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ func TestSelectNextServer(t *testing.T) {
511511
opts := GetDefaultOptions()
512512
opts.Servers = testServers
513513
opts.NoRandomize = true
514+
opts.MaxReconnect = 60
514515
nc := &Conn{Opts: opts}
515516
if err := nc.setupServerPool(); err != nil {
516517
t.Fatalf("Problem setting up Server Pool: %v\n", err)
@@ -1609,14 +1610,14 @@ func TestExpiredAuthentication(t *testing.T) {
16091610
name string
16101611
expectedProto string
16111612
expectedErr error
1612-
ignoreAbort bool
1613+
withAuthAbort bool
16131614
}{
1614-
{"expired users credentials", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired, false},
1615-
{"revoked users credentials", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked, false},
1616-
{"expired account", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired, false},
16171615
{"expired users credentials", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired, true},
16181616
{"revoked users credentials", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked, true},
16191617
{"expired account", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired, true},
1618+
{"expired users credentials, abort connection", AUTHENTICATION_EXPIRED_ERR, ErrAuthExpired, false},
1619+
{"revoked users credentials, abort connection", AUTHENTICATION_REVOKED_ERR, ErrAuthRevoked, false},
1620+
{"expired account, abort connection", ACCOUNT_AUTHENTICATION_EXPIRED_ERR, ErrAccountAuthExpired, false},
16201621
} {
16211622
t.Run(test.name, func(t *testing.T) {
16221623
l, e := net.Listen("tcp", "127.0.0.1:0")
@@ -1678,16 +1679,16 @@ func TestExpiredAuthentication(t *testing.T) {
16781679
ch <- true
16791680
}),
16801681
}
1681-
if test.ignoreAbort {
1682-
opts = append(opts, IgnoreAuthErrorAbort())
1682+
if test.withAuthAbort {
1683+
opts = append(opts, AbortOnAuthErrors())
16831684
}
16841685
nc, err := Connect(url, opts...)
16851686
if err != nil {
16861687
t.Fatalf("Expected to connect, got %v", err)
16871688
}
16881689
defer nc.Close()
16891690

1690-
if test.ignoreAbort {
1691+
if !test.withAuthAbort {
16911692
// We expect more than 3 errors, as the connect attempt should not be aborted after 2 failed attempts.
16921693
for i := 0; i < 4; i++ {
16931694
select {
@@ -2171,7 +2172,7 @@ func BenchmarkNextMsgNoTimeout(b *testing.B) {
21712172
}
21722173
}
21732174

2174-
func TestAuthErrorOnReconnect(t *testing.T) {
2175+
func TestAuthErrorOnReconnectWithAuthErrorAbort(t *testing.T) {
21752176
// This is a bit of an artificial test, but it is to demonstrate
21762177
// that if the client is disconnected from a server (not due to an auth error),
21772178
// it will still correctly stop the reconnection logic if it gets twice an
@@ -2199,6 +2200,7 @@ func TestAuthErrorOnReconnect(t *testing.T) {
21992200
MaxReconnects(-1),
22002201
DontRandomize(),
22012202
ErrorHandler(func(_ *Conn, _ *Subscription, _ error) {}),
2203+
AbortOnAuthErrors(),
22022204
DisconnectErrHandler(func(_ *Conn, e error) {
22032205
dch <- true
22042206
}),
@@ -2948,6 +2950,49 @@ func TestInProcessConn(t *testing.T) {
29482950
}
29492951
}
29502952

2953+
func TestDefaultReconnectBackoffHandler(t *testing.T) {
2954+
tests := []struct {
2955+
name string
2956+
attempts int
2957+
jitter time.Duration
2958+
expectedRange []time.Duration
2959+
}{
2960+
{
2961+
name: "4 attempts, no jitter",
2962+
attempts: 4,
2963+
expectedRange: []time.Duration{80 * time.Millisecond},
2964+
},
2965+
{
2966+
name: "1 attempt, no jitter, return base value",
2967+
attempts: 1,
2968+
expectedRange: []time.Duration{10 * time.Millisecond},
2969+
},
2970+
{
2971+
name: "100 attempts, no jitter, return max",
2972+
attempts: 100,
2973+
expectedRange: []time.Duration{2 * time.Minute},
2974+
},
2975+
{
2976+
name: "4 attempts, with jitter",
2977+
attempts: 4,
2978+
jitter: 20 * time.Millisecond,
2979+
expectedRange: []time.Duration{80 * time.Millisecond, 99 * time.Millisecond},
2980+
},
2981+
}
2982+
2983+
for _, test := range tests {
2984+
t.Run(test.name, func(t *testing.T) {
2985+
cb := DefaultReconnectBackoffHandler(test.jitter)
2986+
res := cb(test.attempts)
2987+
if test.jitter == 0 {
2988+
if res != test.expectedRange[0] {
2989+
t.Fatalf("Invalid result; want: %s; got: %s", test.expectedRange[0], res)
2990+
}
2991+
}
2992+
})
2993+
}
2994+
}
2995+
29512996
func TestServerListWithTrailingComma(t *testing.T) {
29522997
s := RunServerOnPort(-1)
29532998
defer s.Shutdown()

test/conn_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2564,6 +2564,7 @@ func TestRetryOnFailedConnect(t *testing.T) {
25642564
nats.RetryOnFailedConnect(true),
25652565
nats.MaxReconnects(-1),
25662566
nats.ReconnectWait(15*time.Millisecond),
2567+
nats.AbortOnAuthErrors(),
25672568
nats.ReconnectHandler(func(_ *nats.Conn) {
25682569
ch <- true
25692570
}),

test/js_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8629,7 +8629,7 @@ func TestJetStreamOrderedConsumerRecreateAfterReconnect(t *testing.T) {
86298629
}
86308630
hbMissed <- struct{}{}
86318631
}
8632-
nc, js := jsClient(t, s, nats.ErrorHandler(errHandler))
8632+
nc, js := jsClient(t, s, nats.ErrorHandler(errHandler), nats.ReconnectWait(500*time.Millisecond))
86338633
defer nc.Close()
86348634

86358635
if _, err := js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}); err != nil {

test/reconnect_test.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,6 @@ func startReconnectServer(t *testing.T) *server.Server {
3030
return RunServerOnPort(22222)
3131
}
3232

33-
func TestReconnectTotalTime(t *testing.T) {
34-
opts := nats.GetDefaultOptions()
35-
totalReconnectTime := time.Duration(opts.MaxReconnect) * opts.ReconnectWait
36-
if totalReconnectTime < (2 * time.Minute) {
37-
t.Fatalf("Total reconnect time should be at least 2 mins: Currently %v\n",
38-
totalReconnectTime)
39-
}
40-
}
41-
4233
func TestDefaultReconnectJitter(t *testing.T) {
4334
opts := nats.GetDefaultOptions()
4435
if opts.ReconnectJitter != nats.DefaultReconnectJitter {
@@ -123,7 +114,6 @@ var reconnectOpts = nats.Options{
123114
Url: "nats://127.0.0.1:22222",
124115
AllowReconnect: true,
125116
MaxReconnect: 10,
126-
ReconnectWait: 100 * time.Millisecond,
127117
Timeout: nats.DefaultTimeout,
128118
}
129119

0 commit comments

Comments
 (0)