diff --git a/cassandra_test.go b/cassandra_test.go index 2d297c5f8..90b719253 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -4,7 +4,6 @@ package gocql import ( "bytes" - "golang.org/x/net/context" "io" "math" "math/big" @@ -17,6 +16,8 @@ import ( "time" "unicode" + "golang.org/x/net/context" + "gopkg.in/inf.v0" ) @@ -558,6 +559,26 @@ func TestCreateSessionTimeout(t *testing.T) { } } +func TestReconnection(t *testing.T) { + cluster := createCluster() + cluster.ReconnectInterval = 1 * time.Second + session := createSessionFromCluster(cluster, t) + defer session.Close() + + h := session.ring.allHosts()[0] + session.handleNodeDown(net.ParseIP(h.Peer()), h.Port()) + + if h.State() != NodeDown { + t.Fatal("Host should be NodeDown but not.") + } + + time.Sleep(cluster.ReconnectInterval + h.Version().nodeUpDelay() + 1*time.Second) + + if h.State() != NodeUp { + t.Fatal("Host should be NodeUp but not. Failed to reconnect.") + } +} + type FullName struct { FirstName string LastName string diff --git a/cluster.go b/cluster.go index d311f3d40..c84f376cd 100644 --- a/cluster.go +++ b/cluster.go @@ -72,6 +72,9 @@ type ClusterConfig struct { Discovery DiscoveryConfig + // If not zero, gocql attempt to reconnect known DOWN nodes in every ReconnectSleep. + ReconnectInterval time.Duration + // The maximum amount of time to wait for schema agreement in a cluster after // receiving a schema change frame. (deault: 60s) MaxWaitSchemaAgreement time.Duration @@ -126,6 +129,7 @@ func NewCluster(hosts ...string) *ClusterConfig { PageSize: 5000, DefaultTimestamp: true, MaxWaitSchemaAgreement: 60 * time.Second, + ReconnectInterval: 60 * time.Second, } return cfg } diff --git a/conn.go b/conn.go index ea1b6bfa1..4c6030d40 100644 --- a/conn.go +++ b/conn.go @@ -9,7 +9,6 @@ import ( "crypto/tls" "errors" "fmt" - "golang.org/x/net/context" "io" "io/ioutil" "log" @@ -20,6 +19,8 @@ import ( "sync/atomic" "time" + "golang.org/x/net/context" + "github.com/gocql/gocql/internal/lru" "github.com/gocql/gocql/internal/streams" diff --git a/events.go b/events.go index 863b4cd57..8d8a67f44 100644 --- a/events.go +++ b/events.go @@ -244,6 +244,9 @@ func (s *Session) handleRemovedNode(ip net.IP, port int) { } func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) { + if gocqlDebug { + log.Printf("gocql: Session.handleNodeUp: %s:%d\n", ip.String(), port) + } addr := ip.String() host := s.ring.getHost(addr) if host != nil { @@ -275,6 +278,9 @@ func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) { } func (s *Session) handleNodeDown(ip net.IP, port int) { + if gocqlDebug { + log.Printf("gocql: Session.handleNodeDown: %s:%d\n", ip.String(), port) + } addr := ip.String() host := s.ring.getHost(addr) if host == nil { diff --git a/session.go b/session.go index a8621881c..df249e317 100644 --- a/session.go +++ b/session.go @@ -9,8 +9,8 @@ import ( "encoding/binary" "errors" "fmt" - "golang.org/x/net/context" "io" + "log" "net" "strconv" "strings" @@ -19,6 +19,8 @@ import ( "time" "unicode" + "golang.org/x/net/context" + "github.com/gocql/gocql/internal/lru" ) @@ -175,6 +177,10 @@ func NewSession(cfg ClusterConfig) (*Session, error) { } } + if cfg.ReconnectInterval > 0 { + go s.reconnectDownedHosts(cfg.ReconnectInterval) + } + // TODO(zariel): we probably dont need this any more as we verify that we // can connect to one of the endpoints supplied by using the control conn. // See if there are any connections in the pool @@ -188,6 +194,30 @@ func NewSession(cfg ClusterConfig) (*Session, error) { return s, nil } +func (s *Session) reconnectDownedHosts(intv time.Duration) { + for !s.Closed() { + time.Sleep(intv) + + hosts := s.ring.allHosts() + + // Print session.ring for debug. + if gocqlDebug { + buf := bytes.NewBufferString("Session.ring:") + for _, h := range hosts { + buf.WriteString("[" + h.Peer() + ":" + h.State().String() + "]") + } + log.Println(buf.String()) + } + + for _, h := range hosts { + if h.IsUp() { + continue + } + s.handleNodeUp(net.ParseIP(h.Peer()), h.Port(), true) + } + } +} + // SetConsistency sets the default consistency level for this session. This // setting can also be changed on a per-query basis and the default value // is Quorum.