Skip to content

Commit

Permalink
Attempt to reconnect known DOWN nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
matope committed May 2, 2016
1 parent 70000bc commit a0223bb
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 3 deletions.
23 changes: 22 additions & 1 deletion cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package gocql

import (
"bytes"
"golang.org/x/net/context"
"io"
"math"
"math/big"
Expand All @@ -17,6 +16,8 @@ import (
"time"
"unicode"

"golang.org/x/net/context"

"gopkg.in/inf.v0"
)

Expand Down Expand Up @@ -558,6 +559,26 @@ func TestCreateSessionTimeout(t *testing.T) {
}
}

func TestReconnection(t *testing.T) {
cluster := createCluster()
cluster.ReconnectInterval = 1 * time.Second
session := createSessionFromCluster(cluster, t)
defer session.Close()

h := session.ring.allHosts()[0]
session.handleNodeDown(net.ParseIP(h.peer), h.port)

if h.State() != NodeDown {
t.Fatal("Host should be NodeDown but not.")
}

time.Sleep(cluster.ReconnectInterval + h.Version().nodeUpDelay() + 1*time.Second)

if h.State() != NodeUp {
t.Fatal("Host should be NodeUp but not. Failed to reconnect.")
}
}

type FullName struct {
FirstName string
LastName string
Expand Down
4 changes: 4 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type ClusterConfig struct {

Discovery DiscoveryConfig

// If not zero, gocql attempt to reconnect known DOWN nodes in every ReconnectSleep.
ReconnectInterval time.Duration

// The maximum amount of time to wait for schema agreement in a cluster after
// receiving a schema change frame. (deault: 60s)
MaxWaitSchemaAgreement time.Duration
Expand Down Expand Up @@ -126,6 +129,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
PageSize: 5000,
DefaultTimestamp: true,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
}
return cfg
}
Expand Down
3 changes: 2 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"crypto/tls"
"errors"
"fmt"
"golang.org/x/net/context"
"io"
"io/ioutil"
"log"
Expand All @@ -20,6 +19,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/net/context"

"github.com/gocql/gocql/internal/lru"

"github.com/gocql/gocql/internal/streams"
Expand Down
6 changes: 6 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ func (s *Session) handleRemovedNode(ip net.IP, port int) {
}

func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) {
if gocqlDebug {
log.Printf("gocql: Session.handleNodeUp: %s:%d\n", ip.String(), port)
}
addr := ip.String()
host := s.ring.getHost(addr)
if host != nil {
Expand Down Expand Up @@ -275,6 +278,9 @@ func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) {
}

func (s *Session) handleNodeDown(ip net.IP, port int) {
if gocqlDebug {
log.Printf("gocql: Session.handleNodeDown: %s:%d\n", ip.String(), port)
}
addr := ip.String()
host := s.ring.getHost(addr)
if host == nil {
Expand Down
1 change: 1 addition & 0 deletions events_ccm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func TestEventNodeDownControl(t *testing.T) {
}

cluster := createCluster()
cluster.ReconnectInterval = 0
cluster.Hosts = []string{status[targetNode].Addr}
session := createSessionFromCluster(cluster, t)
defer session.Close()
Expand Down
29 changes: 28 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"encoding/binary"
"errors"
"fmt"
"golang.org/x/net/context"
"io"
"log"
"net"
"strconv"
"strings"
Expand All @@ -19,6 +19,8 @@ import (
"time"
"unicode"

"golang.org/x/net/context"

"github.com/gocql/gocql/internal/lru"
)

Expand Down Expand Up @@ -175,6 +177,31 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
}
}

if cfg.ReconnectInterval > 0 {
go func() {
for !s.Closed() {
time.Sleep(cfg.ReconnectInterval)
hosts := s.ring.allHosts()

// Print session.ring for debug.
if gocqlDebug {
buf := bytes.NewBufferString("Session.ring:")
for _, h := range hosts {
buf.WriteString("[" + h.Peer() + ":" + h.State().String() + "]")
}
log.Println(buf.String())
}

for _, h := range hosts {
if h.IsUp() {
continue
}
s.handleNodeUp(net.ParseIP(h.peer), h.port, true)
}
}
}()
}

// TODO(zariel): we probably dont need this any more as we verify that we
// can connect to one of the endpoints supplied by using the control conn.
// See if there are any connections in the pool
Expand Down

0 comments on commit a0223bb

Please sign in to comment.