From a0223bbf7d374789b6f947ed5fb961fa9ce09e58 Mon Sep 17 00:00:00 2001 From: matope Date: Tue, 3 May 2016 00:05:51 +0900 Subject: [PATCH 1/2] Attempt to reconnect known DOWN nodes --- cassandra_test.go | 23 ++++++++++++++++++++++- cluster.go | 4 ++++ conn.go | 3 ++- events.go | 6 ++++++ events_ccm_test.go | 1 + session.go | 29 ++++++++++++++++++++++++++++- 6 files changed, 63 insertions(+), 3 deletions(-) diff --git a/cassandra_test.go b/cassandra_test.go index 2d297c5f8..6f96990b6 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -4,7 +4,6 @@ package gocql import ( "bytes" - "golang.org/x/net/context" "io" "math" "math/big" @@ -17,6 +16,8 @@ import ( "time" "unicode" + "golang.org/x/net/context" + "gopkg.in/inf.v0" ) @@ -558,6 +559,26 @@ func TestCreateSessionTimeout(t *testing.T) { } } +func TestReconnection(t *testing.T) { + cluster := createCluster() + cluster.ReconnectInterval = 1 * time.Second + session := createSessionFromCluster(cluster, t) + defer session.Close() + + h := session.ring.allHosts()[0] + session.handleNodeDown(net.ParseIP(h.peer), h.port) + + if h.State() != NodeDown { + t.Fatal("Host should be NodeDown but not.") + } + + time.Sleep(cluster.ReconnectInterval + h.Version().nodeUpDelay() + 1*time.Second) + + if h.State() != NodeUp { + t.Fatal("Host should be NodeUp but not. Failed to reconnect.") + } +} + type FullName struct { FirstName string LastName string diff --git a/cluster.go b/cluster.go index d311f3d40..c84f376cd 100644 --- a/cluster.go +++ b/cluster.go @@ -72,6 +72,9 @@ type ClusterConfig struct { Discovery DiscoveryConfig + // If not zero, gocql attempt to reconnect known DOWN nodes in every ReconnectSleep. + ReconnectInterval time.Duration + // The maximum amount of time to wait for schema agreement in a cluster after // receiving a schema change frame. (deault: 60s) MaxWaitSchemaAgreement time.Duration @@ -126,6 +129,7 @@ func NewCluster(hosts ...string) *ClusterConfig { PageSize: 5000, DefaultTimestamp: true, MaxWaitSchemaAgreement: 60 * time.Second, + ReconnectInterval: 60 * time.Second, } return cfg } diff --git a/conn.go b/conn.go index ea1b6bfa1..4c6030d40 100644 --- a/conn.go +++ b/conn.go @@ -9,7 +9,6 @@ import ( "crypto/tls" "errors" "fmt" - "golang.org/x/net/context" "io" "io/ioutil" "log" @@ -20,6 +19,8 @@ import ( "sync/atomic" "time" + "golang.org/x/net/context" + "github.com/gocql/gocql/internal/lru" "github.com/gocql/gocql/internal/streams" diff --git a/events.go b/events.go index 863b4cd57..8d8a67f44 100644 --- a/events.go +++ b/events.go @@ -244,6 +244,9 @@ func (s *Session) handleRemovedNode(ip net.IP, port int) { } func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) { + if gocqlDebug { + log.Printf("gocql: Session.handleNodeUp: %s:%d\n", ip.String(), port) + } addr := ip.String() host := s.ring.getHost(addr) if host != nil { @@ -275,6 +278,9 @@ func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) { } func (s *Session) handleNodeDown(ip net.IP, port int) { + if gocqlDebug { + log.Printf("gocql: Session.handleNodeDown: %s:%d\n", ip.String(), port) + } addr := ip.String() host := s.ring.getHost(addr) if host == nil { diff --git a/events_ccm_test.go b/events_ccm_test.go index 6ec0180c6..cd34750ad 100644 --- a/events_ccm_test.go +++ b/events_ccm_test.go @@ -51,6 +51,7 @@ func TestEventNodeDownControl(t *testing.T) { } cluster := createCluster() + cluster.ReconnectInterval = 0 cluster.Hosts = []string{status[targetNode].Addr} session := createSessionFromCluster(cluster, t) defer session.Close() diff --git a/session.go b/session.go index a8621881c..49d173f18 100644 --- a/session.go +++ b/session.go @@ -9,8 +9,8 @@ import ( "encoding/binary" "errors" "fmt" - "golang.org/x/net/context" "io" + "log" "net" "strconv" "strings" @@ -19,6 +19,8 @@ import ( "time" "unicode" + "golang.org/x/net/context" + "github.com/gocql/gocql/internal/lru" ) @@ -175,6 +177,31 @@ func NewSession(cfg ClusterConfig) (*Session, error) { } } + if cfg.ReconnectInterval > 0 { + go func() { + for !s.Closed() { + time.Sleep(cfg.ReconnectInterval) + hosts := s.ring.allHosts() + + // Print session.ring for debug. + if gocqlDebug { + buf := bytes.NewBufferString("Session.ring:") + for _, h := range hosts { + buf.WriteString("[" + h.Peer() + ":" + h.State().String() + "]") + } + log.Println(buf.String()) + } + + for _, h := range hosts { + if h.IsUp() { + continue + } + s.handleNodeUp(net.ParseIP(h.peer), h.port, true) + } + } + }() + } + // TODO(zariel): we probably dont need this any more as we verify that we // can connect to one of the endpoints supplied by using the control conn. // See if there are any connections in the pool From 2ea86d3ae9baa67e82490af4c8caaf622f76794c Mon Sep 17 00:00:00 2001 From: matope Date: Tue, 3 May 2016 02:07:10 +0900 Subject: [PATCH 2/2] Move reconnection goroutine into a method --- cassandra_test.go | 2 +- events_ccm_test.go | 1 - session.go | 47 ++++++++++++++++++++++++---------------------- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/cassandra_test.go b/cassandra_test.go index 6f96990b6..90b719253 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -566,7 +566,7 @@ func TestReconnection(t *testing.T) { defer session.Close() h := session.ring.allHosts()[0] - session.handleNodeDown(net.ParseIP(h.peer), h.port) + session.handleNodeDown(net.ParseIP(h.Peer()), h.Port()) if h.State() != NodeDown { t.Fatal("Host should be NodeDown but not.") diff --git a/events_ccm_test.go b/events_ccm_test.go index cd34750ad..6ec0180c6 100644 --- a/events_ccm_test.go +++ b/events_ccm_test.go @@ -51,7 +51,6 @@ func TestEventNodeDownControl(t *testing.T) { } cluster := createCluster() - cluster.ReconnectInterval = 0 cluster.Hosts = []string{status[targetNode].Addr} session := createSessionFromCluster(cluster, t) defer session.Close() diff --git a/session.go b/session.go index 49d173f18..df249e317 100644 --- a/session.go +++ b/session.go @@ -178,28 +178,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) { } if cfg.ReconnectInterval > 0 { - go func() { - for !s.Closed() { - time.Sleep(cfg.ReconnectInterval) - hosts := s.ring.allHosts() - - // Print session.ring for debug. - if gocqlDebug { - buf := bytes.NewBufferString("Session.ring:") - for _, h := range hosts { - buf.WriteString("[" + h.Peer() + ":" + h.State().String() + "]") - } - log.Println(buf.String()) - } - - for _, h := range hosts { - if h.IsUp() { - continue - } - s.handleNodeUp(net.ParseIP(h.peer), h.port, true) - } - } - }() + go s.reconnectDownedHosts(cfg.ReconnectInterval) } // TODO(zariel): we probably dont need this any more as we verify that we @@ -215,6 +194,30 @@ func NewSession(cfg ClusterConfig) (*Session, error) { return s, nil } +func (s *Session) reconnectDownedHosts(intv time.Duration) { + for !s.Closed() { + time.Sleep(intv) + + hosts := s.ring.allHosts() + + // Print session.ring for debug. + if gocqlDebug { + buf := bytes.NewBufferString("Session.ring:") + for _, h := range hosts { + buf.WriteString("[" + h.Peer() + ":" + h.State().String() + "]") + } + log.Println(buf.String()) + } + + for _, h := range hosts { + if h.IsUp() { + continue + } + s.handleNodeUp(net.ParseIP(h.Peer()), h.Port(), true) + } + } +} + // SetConsistency sets the default consistency level for this session. This // setting can also be changed on a per-query basis and the default value // is Quorum.