Skip to content

Commit

Permalink
add queryExecutor which executes queries.
Browse files Browse the repository at this point in the history
Simplify how queries and batches are executed by making them both
implement the same interface which allows the queryExecutor to handle
both cases.

Remove conn selection policy and let the connection pool make decisions
about which conn to use.
  • Loading branch information
Zariel committed Mar 29, 2016
1 parent 277df5b commit a04d2a5
Show file tree
Hide file tree
Showing 13 changed files with 302 additions and 340 deletions.
28 changes: 13 additions & 15 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,14 @@ func TestBatchQueryInfo(t *testing.T) {
}
}

func getRandomConn(t *testing.T, session *Session) *Conn {
conn := session.getConn()
if conn == nil {
t.Fatal("unable to get a connection")
}
return conn
}

func injectInvalidPreparedStatement(t *testing.T, session *Session, table string) (string, *Conn) {
if err := createTable(session, `CREATE TABLE gocql_test.`+table+` (
foo varchar,
Expand All @@ -1029,7 +1037,8 @@ func injectInvalidPreparedStatement(t *testing.T, session *Session, table string
}

stmt := "INSERT INTO " + table + " (foo, bar) VALUES (?, 7)"
_, conn := session.pool.Pick(nil)

conn := getRandomConn(t, session)

flight := new(inflightPrepare)
key := session.stmtsLRU.keyFor(conn.addr, "", stmt)
Expand Down Expand Up @@ -1060,7 +1069,7 @@ func injectInvalidPreparedStatement(t *testing.T, session *Session, table string

func TestPrepare_MissingSchemaPrepare(t *testing.T) {
s := createSession(t)
_, conn := s.pool.Pick(nil)
conn := getRandomConn(t, s)
defer s.Close()

insertQry := &Query{stmt: "INSERT INTO invalidschemaprep (val) VALUES (?)", values: []interface{}{5}, cons: s.cons,
Expand Down Expand Up @@ -1108,7 +1117,7 @@ func TestQueryInfo(t *testing.T) {
session := createSession(t)
defer session.Close()

_, conn := session.pool.Pick(nil)
conn := getRandomConn(t, session)
info, err := conn.prepareStatement("SELECT release_version, host_id FROM system.local WHERE key = ?", nil)

if err != nil {
Expand Down Expand Up @@ -1982,18 +1991,7 @@ func TestNegativeStream(t *testing.T) {
session := createSession(t)
defer session.Close()

var conn *Conn
for i := 0; i < 5; i++ {
if conn != nil {
break
}

_, conn = session.pool.Pick(nil)
}

if conn == nil {
t.Fatal("no connections available in the pool")
}
conn := getRandomConn(t, session)

const stream = -50
writer := frameWriterFunc(func(f *framer, streamID int) error {
Expand Down
16 changes: 1 addition & 15 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,10 @@ type PoolConfig struct {
// HostSelectionPolicy sets the policy for selecting which host to use for a
// given query (default: RoundRobinHostPolicy())
HostSelectionPolicy HostSelectionPolicy

// ConnSelectionPolicy sets the policy factory for selecting a connection to use for
// each host for a query (default: RoundRobinConnPolicy())
ConnSelectionPolicy func() ConnSelectionPolicy
}

func (p PoolConfig) buildPool(session *Session) *policyConnPool {
hostSelection := p.HostSelectionPolicy
if hostSelection == nil {
hostSelection = RoundRobinHostPolicy()
}

connSelection := p.ConnSelectionPolicy
if connSelection == nil {
connSelection = RoundRobinConnPolicy()
}

return newPolicyConnPool(session, hostSelection, connSelection)
return newPolicyConnPool(session)
}

type DiscoveryConfig struct {
Expand Down
1 change: 0 additions & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ func TestPolicyConnPoolSSL(t *testing.T) {

cluster := createTestSslCluster(srv.Address, defaultProto, true)
cluster.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy()
cluster.PoolConfig.ConnSelectionPolicy = RoundRobinConnPolicy()

db, err := cluster.CreateSession()
if err != nil {
Expand Down
99 changes: 25 additions & 74 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -65,8 +66,6 @@ type policyConnPool struct {
keyspace string

mu sync.RWMutex
hostPolicy HostSelectionPolicy
connPolicy func() ConnSelectionPolicy
hostConnPools map[string]*hostConnPool

endpoints []string
Expand Down Expand Up @@ -99,17 +98,13 @@ func connConfig(session *Session) (*ConnConfig, error) {
}, nil
}

func newPolicyConnPool(session *Session, hostPolicy HostSelectionPolicy,
connPolicy func() ConnSelectionPolicy) *policyConnPool {

func newPolicyConnPool(session *Session) *policyConnPool {
// create the pool
pool := &policyConnPool{
session: session,
port: session.cfg.Port,
numConns: session.cfg.NumConns,
keyspace: session.cfg.Keyspace,
hostPolicy: hostPolicy,
connPolicy: connPolicy,
hostConnPools: map[string]*hostConnPool{},
}

Expand Down Expand Up @@ -150,7 +145,6 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
p.port,
p.numConns,
p.keyspace,
p.connPolicy(),
)
}(host)
}
Expand All @@ -170,13 +164,6 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
delete(p.hostConnPools, addr)
go pool.Close()
}

// update the policy
p.hostPolicy.SetHosts(hosts)
}

func (p *policyConnPool) SetPartitioner(partitioner string) {
p.hostPolicy.SetPartitioner(partitioner)
}

func (p *policyConnPool) Size() int {
Expand All @@ -197,41 +184,10 @@ func (p *policyConnPool) getPool(addr string) (pool *hostConnPool, ok bool) {
return
}

func (p *policyConnPool) Pick(qry *Query) (SelectedHost, *Conn) {
nextHost := p.hostPolicy.Pick(qry)

var (
host SelectedHost
conn *Conn
)

p.mu.RLock()
defer p.mu.RUnlock()
for conn == nil {
host = nextHost()
if host == nil {
break
} else if host.Info() == nil {
panic(fmt.Sprintf("policy %T returned no host info: %+v", p.hostPolicy, host))
}

pool, ok := p.hostConnPools[host.Info().Peer()]
if !ok {
continue
}

conn = pool.Pick(qry)
}
return host, conn
}

func (p *policyConnPool) Close() {
p.mu.Lock()
defer p.mu.Unlock()

// remove the hosts from the policy
p.hostPolicy.SetHosts(nil)

// close the pools
for addr, pool := range p.hostConnPools {
delete(p.hostConnPools, addr)
Expand All @@ -249,25 +205,17 @@ func (p *policyConnPool) addHost(host *HostInfo) {
host.Port(), // TODO: if port == 0 use pool.port?
p.numConns,
p.keyspace,
p.connPolicy(),
)

p.hostConnPools[host.Peer()] = pool
}
p.mu.Unlock()

pool.fill()

// update policy
// TODO: policy should not have conns, it should have hosts and return a host
// iter which the pool will use to serve conns
p.hostPolicy.AddHost(host)
}

func (p *policyConnPool) removeHost(addr string) {
p.hostPolicy.RemoveHost(addr)
p.mu.Lock()

pool, ok := p.hostConnPools[addr]
if !ok {
p.mu.Unlock()
Expand Down Expand Up @@ -301,12 +249,13 @@ type hostConnPool struct {
addr string
size int
keyspace string
policy ConnSelectionPolicy
// protection for conns, closed, filling
mu sync.RWMutex
conns []*Conn
closed bool
filling bool

pos uint32
}

func (h *hostConnPool) String() string {
Expand All @@ -317,7 +266,7 @@ func (h *hostConnPool) String() string {
}

func newHostConnPool(session *Session, host *HostInfo, port, size int,
keyspace string, policy ConnSelectionPolicy) *hostConnPool {
keyspace string) *hostConnPool {

pool := &hostConnPool{
session: session,
Expand All @@ -326,7 +275,6 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int,
addr: JoinHostPort(host.Peer(), port),
size: size,
keyspace: keyspace,
policy: policy,
conns: make([]*Conn, 0, size),
filling: false,
closed: false,
Expand All @@ -337,16 +285,15 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int,
}

// Pick a connection from this connection pool for the given query.
func (pool *hostConnPool) Pick(qry *Query) *Conn {
func (pool *hostConnPool) Pick() *Conn {
pool.mu.RLock()
defer pool.mu.RUnlock()

if pool.closed {
pool.mu.RUnlock()
return nil
}

size := len(pool.conns)
pool.mu.RUnlock()

if size < pool.size {
// try to fill the pool
go pool.fill()
Expand All @@ -356,7 +303,23 @@ func (pool *hostConnPool) Pick(qry *Query) *Conn {
}
}

return pool.policy.Pick(qry)
pos := int(atomic.AddUint32(&pool.pos, 1) - 1)

var (
leastBusyConn *Conn
streamsAvailable int
)

// find the conn which has the most available streams, this is racy
for i := 0; i < size; i++ {
conn := pool.conns[(pos+i)%size]
if streams := conn.AvailableStreams(); streams > streamsAvailable {
leastBusyConn = conn
streamsAvailable = streams
}
}

return leastBusyConn
}

//Size returns the number of connections currently active in the pool
Expand Down Expand Up @@ -543,10 +506,6 @@ func (pool *hostConnPool) connect() (err error) {

pool.conns = append(pool.conns, conn)

conns := make([]*Conn, len(pool.conns))
copy(conns, pool.conns)
pool.policy.SetConns(conns)

return nil
}

Expand All @@ -573,11 +532,6 @@ func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
// remove the connection, not preserving order
pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]

// update the policy
conns := make([]*Conn, len(pool.conns))
copy(conns, pool.conns)
pool.policy.SetConns(conns)

// lost a connection, so fill the pool
go pool.fill()
break
Expand All @@ -590,9 +544,6 @@ func (pool *hostConnPool) drainLocked() {
conns := pool.conns
pool.conns = nil

// update the policy
pool.policy.SetConns(nil)

// close the connections
for _, conn := range conns {
conn.Close()
Expand Down
6 changes: 3 additions & 3 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,14 @@ func (c *controlConn) reconnect(refreshring bool) {
// TODO: should have our own roundrobbin for hosts so that we can try each
// in succession and guantee that we get a different host each time.
if newConn == nil {
_, conn := c.session.pool.Pick(nil)
if conn == nil {
host := c.session.ring.rrHost()
if host == nil {
c.connect(c.session.ring.endpoints)
return
}

var err error
newConn, err = c.session.connect(conn.addr, c, conn.host)
newConn, err = c.session.connect(host.Peer(), c, host)
if err != nil {
// TODO: add log handler for things like this
return
Expand Down
4 changes: 4 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (s *Session) handleNewNode(host net.IP, port int, waitForBinary bool) {
}

s.pool.addHost(hostInfo)
s.policy.AddHost(hostInfo)
hostInfo.setState(NodeUp)

if s.control != nil {
Expand All @@ -222,6 +223,7 @@ func (s *Session) handleRemovedNode(ip net.IP, port int) {
}

host.setState(NodeDown)
s.policy.RemoveHost(addr)
s.pool.removeHost(addr)
s.ring.removeHost(addr)

Expand Down Expand Up @@ -251,6 +253,7 @@ func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) {

host.setPort(port)
s.pool.hostUp(host)
s.policy.HostUp(host)
host.setState(NodeUp)
return
}
Expand All @@ -270,5 +273,6 @@ func (s *Session) handleNodeDown(ip net.IP, port int) {
}

host.setState(NodeDown)
s.policy.HostDown(addr)
s.pool.hostDown(addr)
}
2 changes: 1 addition & 1 deletion host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,6 @@ func (r *ringDescriber) refreshRing() error {
}
}

r.session.pool.SetPartitioner(partitioner)
r.session.metadata.setPartitioner(partitioner)
return nil
}
Loading

0 comments on commit a04d2a5

Please sign in to comment.