Skip to content

Commit

Permalink
Merge pull request apache#723 from matope/attempt-to-reconnect-known-…
Browse files Browse the repository at this point in the history
…down-nodes

Attempt to reconnect known DOWN nodes
  • Loading branch information
Zariel committed May 2, 2016
2 parents 70000bc + 2ea86d3 commit 2e0390b
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 3 deletions.
23 changes: 22 additions & 1 deletion cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package gocql

import (
"bytes"
"golang.org/x/net/context"
"io"
"math"
"math/big"
Expand All @@ -17,6 +16,8 @@ import (
"time"
"unicode"

"golang.org/x/net/context"

"gopkg.in/inf.v0"
)

Expand Down Expand Up @@ -558,6 +559,26 @@ func TestCreateSessionTimeout(t *testing.T) {
}
}

func TestReconnection(t *testing.T) {
cluster := createCluster()
cluster.ReconnectInterval = 1 * time.Second
session := createSessionFromCluster(cluster, t)
defer session.Close()

h := session.ring.allHosts()[0]
session.handleNodeDown(net.ParseIP(h.Peer()), h.Port())

if h.State() != NodeDown {
t.Fatal("Host should be NodeDown but not.")
}

time.Sleep(cluster.ReconnectInterval + h.Version().nodeUpDelay() + 1*time.Second)

if h.State() != NodeUp {
t.Fatal("Host should be NodeUp but not. Failed to reconnect.")
}
}

type FullName struct {
FirstName string
LastName string
Expand Down
4 changes: 4 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type ClusterConfig struct {

Discovery DiscoveryConfig

// If not zero, gocql attempt to reconnect known DOWN nodes in every ReconnectSleep.
ReconnectInterval time.Duration

// The maximum amount of time to wait for schema agreement in a cluster after
// receiving a schema change frame. (deault: 60s)
MaxWaitSchemaAgreement time.Duration
Expand Down Expand Up @@ -126,6 +129,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
PageSize: 5000,
DefaultTimestamp: true,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
}
return cfg
}
Expand Down
3 changes: 2 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"crypto/tls"
"errors"
"fmt"
"golang.org/x/net/context"
"io"
"io/ioutil"
"log"
Expand All @@ -20,6 +19,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/net/context"

"github.com/gocql/gocql/internal/lru"

"github.com/gocql/gocql/internal/streams"
Expand Down
6 changes: 6 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ func (s *Session) handleRemovedNode(ip net.IP, port int) {
}

func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) {
if gocqlDebug {
log.Printf("gocql: Session.handleNodeUp: %s:%d\n", ip.String(), port)
}
addr := ip.String()
host := s.ring.getHost(addr)
if host != nil {
Expand Down Expand Up @@ -275,6 +278,9 @@ func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) {
}

func (s *Session) handleNodeDown(ip net.IP, port int) {
if gocqlDebug {
log.Printf("gocql: Session.handleNodeDown: %s:%d\n", ip.String(), port)
}
addr := ip.String()
host := s.ring.getHost(addr)
if host == nil {
Expand Down
32 changes: 31 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"encoding/binary"
"errors"
"fmt"
"golang.org/x/net/context"
"io"
"log"
"net"
"strconv"
"strings"
Expand All @@ -19,6 +19,8 @@ import (
"time"
"unicode"

"golang.org/x/net/context"

"github.com/gocql/gocql/internal/lru"
)

Expand Down Expand Up @@ -175,6 +177,10 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
}
}

if cfg.ReconnectInterval > 0 {
go s.reconnectDownedHosts(cfg.ReconnectInterval)
}

// TODO(zariel): we probably dont need this any more as we verify that we
// can connect to one of the endpoints supplied by using the control conn.
// See if there are any connections in the pool
Expand All @@ -188,6 +194,30 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
return s, nil
}

func (s *Session) reconnectDownedHosts(intv time.Duration) {
for !s.Closed() {
time.Sleep(intv)

hosts := s.ring.allHosts()

// Print session.ring for debug.
if gocqlDebug {
buf := bytes.NewBufferString("Session.ring:")
for _, h := range hosts {
buf.WriteString("[" + h.Peer() + ":" + h.State().String() + "]")
}
log.Println(buf.String())
}

for _, h := range hosts {
if h.IsUp() {
continue
}
s.handleNodeUp(net.ParseIP(h.Peer()), h.Port(), true)
}
}
}

// SetConsistency sets the default consistency level for this session. This
// setting can also be changed on a per-query basis and the default value
// is Quorum.
Expand Down

0 comments on commit 2e0390b

Please sign in to comment.