Skip to content

Commit

Permalink
Created the Connection Pool interface and converted the existing conn…
Browse files Browse the repository at this point in the history
…ection pool logic to a SimplePool interface.
  • Loading branch information
Phillip Couto committed Apr 30, 2014
1 parent e617f1d commit 3783dd1
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 224 deletions.
214 changes: 7 additions & 207 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ package gocql

import (
"errors"
"fmt"
"log"
"strings"
"sync"
"time"
)

Expand All @@ -31,6 +27,7 @@ type ClusterConfig struct {
Authenticator Authenticator // authenticator (default: nil)
RetryPolicy RetryPolicy // Default retry policy to use for queries (default: 0)
SocketKeepalive time.Duration // The keepalive period to use, enabled if > 0 (default: 0)
ConnPoolType NewPoolFunc // The function used to create the connection pool for the session (default: NewSimplePool)
}

// NewCluster generates a new config for the default cluster implementation.
Expand All @@ -44,6 +41,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
NumConns: 2,
NumStreams: 128,
Consistency: Quorum,
ConnPoolType: NewSimplePool,
}
return cfg
}
Expand All @@ -56,219 +54,21 @@ func (cfg *ClusterConfig) CreateSession() (*Session, error) {
if len(cfg.Hosts) < 1 {
return nil, ErrNoHosts
}
pool := cfg.ConnPoolType(cfg)

impl := &clusterImpl{
cfg: *cfg,
hostPool: NewRoundRobin(),
connPool: make(map[string]*RoundRobin),
conns: make(map[*Conn]struct{}),
quitWait: make(chan bool),
cFillingPool: make(chan int, 1),
keyspace: cfg.Keyspace,
}
//Walk through connecting to hosts. As soon as one host connects
//defer the remaining connections to cluster.fillPool()
for i := 0; i < len(impl.cfg.Hosts); i++ {
addr := strings.TrimSpace(impl.cfg.Hosts[i])
if strings.Index(addr, ":") < 0 {
addr = fmt.Sprintf("%s:%d", addr, impl.cfg.DefaultPort)
}
err := impl.connect(addr)
if err == nil {
impl.cFillingPool <- 1
go impl.fillPool()
break
}

}
//See if there are any connections in the pool
impl.mu.Lock()
conns := len(impl.conns)
impl.mu.Unlock()
if conns > 0 {
s := NewSession(impl)
if pool.Size() > 0 {
s := NewSession(pool, cfg)
s.SetConsistency(cfg.Consistency)
return s, nil
}

impl.Close()
return nil, ErrNoConnectionsStarted

}

type clusterImpl struct {
cfg ClusterConfig
hostPool *RoundRobin
connPool map[string]*RoundRobin
conns map[*Conn]struct{}
keyspace string
mu sync.Mutex

cFillingPool chan int

quit bool
quitWait chan bool
quitOnce sync.Once
}

func (c *clusterImpl) connect(addr string) error {
cfg := ConnConfig{
ProtoVersion: c.cfg.ProtoVersion,
CQLVersion: c.cfg.CQLVersion,
Timeout: c.cfg.Timeout,
NumStreams: c.cfg.NumStreams,
Compressor: c.cfg.Compressor,
Authenticator: c.cfg.Authenticator,
Keepalive: c.cfg.SocketKeepalive,
}

for {
conn, err := Connect(addr, cfg, c)
if err != nil {
log.Printf("failed to connect to %q: %v", addr, err)
return err
}
return c.addConn(conn)
}
}

func (c *clusterImpl) addConn(conn *Conn) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.quit {
conn.Close()
return nil
}
//Set the connection's keyspace if any before adding it to the pool
if c.keyspace != "" {
if err := conn.UseKeyspace(c.keyspace); err != nil {
log.Printf("error setting connection keyspace. %v", err)
conn.Close()
return err
}
}
connPool := c.connPool[conn.Address()]
if connPool == nil {
connPool = NewRoundRobin()
c.connPool[conn.Address()] = connPool
c.hostPool.AddNode(connPool)
}
connPool.AddNode(conn)
c.conns[conn] = struct{}{}
return nil
}

//fillPool manages the pool of connections making sure that each host has the correct
//amount of connections defined. Also the method will test a host with one connection
//instead of flooding the host with number of connections defined in the cluster config
func (c *clusterImpl) fillPool() {
//Debounce large amounts of requests to fill pool
select {
case <-time.After(1 * time.Millisecond):
return
case <-c.cFillingPool:
defer func() { c.cFillingPool <- 1 }()
}

c.mu.Lock()
isClosed := c.quit
c.mu.Unlock()
//Exit if cluster(session) is closed
if isClosed {
return
}
//Walk through list of defined hosts
for i := 0; i < len(c.cfg.Hosts); i++ {
addr := strings.TrimSpace(c.cfg.Hosts[i])
if strings.Index(addr, ":") < 0 {
addr = fmt.Sprintf("%s:%d", addr, c.cfg.DefaultPort)
}
var numConns int = 1
//See if the host already has connections in the pool
c.mu.Lock()
conns, ok := c.connPool[addr]
c.mu.Unlock()

if ok {
//if the host has enough connections just exit
numConns = conns.Size()
if numConns >= c.cfg.NumConns {
continue
}
} else {
//See if the host is reachable
if err := c.connect(addr); err != nil {
continue
}
}
//This is reached if the host is responsive and needs more connections
//Create connections for host synchronously to mitigate flooding the host.
go func(a string, conns int) {
for ; conns < c.cfg.NumConns; conns++ {
c.connect(addr)
}
}(addr, numConns)
}
}

// Should only be called if c.mu is locked
func (c *clusterImpl) removeConnLocked(conn *Conn) {
conn.Close()
connPool := c.connPool[conn.addr]
if connPool == nil {
return
}
connPool.RemoveNode(conn)
if connPool.Size() == 0 {
c.hostPool.RemoveNode(connPool)
delete(c.connPool, conn.addr)
}
delete(c.conns, conn)
}

func (c *clusterImpl) removeConn(conn *Conn) {
c.mu.Lock()
defer c.mu.Unlock()
c.removeConnLocked(conn)
}

func (c *clusterImpl) HandleError(conn *Conn, err error, closed bool) {
if !closed {
// ignore all non-fatal errors
return
}
c.removeConn(conn)
if !c.quit {
go c.fillPool() // top off pool.
}
}

func (c *clusterImpl) Pick(qry *Query) *Conn {
//Check if connections are available
c.mu.Lock()
conns := len(c.conns)
c.mu.Unlock()

if conns == 0 {
//try to populate the pool before returning.
c.fillPool()
}

return c.hostPool.Pick(qry)
}

func (c *clusterImpl) Close() {
c.quitOnce.Do(func() {
c.mu.Lock()
defer c.mu.Unlock()
c.quit = true
close(c.quitWait)
for conn := range c.conns {
c.removeConnLocked(conn)
}
})
}

var (
ErrNoHosts = errors.New("no hosts provided")
ErrNoHosts = errors.New("no hosts provided")
ErrNoConnectionsStarted = errors.New("no connections were made when creating the session")
)
12 changes: 4 additions & 8 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ const defaultFrameSize = 4096
const flagResponse = 0x80
const maskVersion = 0x7F

type Cluster interface {
HandleError(conn *Conn, err error, closed bool)
}

type Authenticator interface {
Challenge(req []byte) (resp []byte, auth Authenticator, err error)
Success(data []byte) error
Expand Down Expand Up @@ -72,7 +68,7 @@ type Conn struct {
prepMu sync.Mutex
prep map[string]*inflightPrepare

cluster Cluster
pool ConnectionPool
compressor Compressor
auth Authenticator
addr string
Expand All @@ -84,7 +80,7 @@ type Conn struct {

// Connect establishes a connection to a Cassandra node.
// You must also call the Serve method before you can execute any queries.
func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
conn, err := net.DialTimeout("tcp", addr, cfg.Timeout)
if err != nil {
return nil, err
Expand All @@ -105,7 +101,7 @@ func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
timeout: cfg.Timeout,
version: uint8(cfg.ProtoVersion),
addr: conn.RemoteAddr().String(),
cluster: cluster,
pool: pool,
compressor: cfg.Compressor,
auth: cfg.Authenticator,
}
Expand Down Expand Up @@ -203,7 +199,7 @@ func (c *Conn) serve() {
req.resp <- callResp{nil, err}
}
}
c.cluster.HandleError(c, err, true)
c.pool.HandleError(c, err, true)
}

func (c *Conn) recv() (frame, error) {
Expand Down
7 changes: 2 additions & 5 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,8 @@ func TestConnClosing(t *testing.T) {
wg.Wait()

time.Sleep(1 * time.Second) //Sleep so the fillPool can complete.
cluster := db.Node.(*clusterImpl)

cluster.mu.Lock()
conns := len(cluster.conns)
cluster.mu.Unlock()
pool := db.Pool.(ConnectionPool)
conns := pool.Size()

if conns != numConns {
t.Fatalf("Expected to have %d connections but have %d", numConns, conns)
Expand Down
Loading

0 comments on commit 3783dd1

Please sign in to comment.