Skip to content

Commit

Permalink
Merge pull request scylladb#375 from Zariel/consolidate-new-session
Browse files Browse the repository at this point in the history
Consolidate new session
  • Loading branch information
0x6e6562 committed Apr 26, 2015
2 parents f7394ea + c9232c7 commit 6703dc3
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 58 deletions.
47 changes: 1 addition & 46 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,52 +101,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
// CreateSession initializes the cluster based on this config and returns a
// session object that can be used to interact with the database.
func (cfg *ClusterConfig) CreateSession() (*Session, error) {

//Check that hosts in the ClusterConfig is not empty
if len(cfg.Hosts) < 1 {
return nil, ErrNoHosts
}

maxStreams := 128
if cfg.ProtoVersion > protoVersion2 {
maxStreams = 32768
}

if cfg.NumStreams <= 0 || cfg.NumStreams > maxStreams {
cfg.NumStreams = maxStreams
}

pool, err := cfg.ConnPoolType(cfg)
if err != nil {
return nil, err
}

//Adjust the size of the prepared statements cache to match the latest configuration
stmtsLRU.Lock()
initStmtsLRU(cfg.MaxPreparedStmts)
stmtsLRU.Unlock()

//See if there are any connections in the pool
if pool.Size() > 0 {
s := NewSession(pool, *cfg)
s.SetConsistency(cfg.Consistency)
s.SetPageSize(cfg.PageSize)

if cfg.DiscoverHosts {
hostSource := &ringDescriber{
session: s,
dcFilter: cfg.Discovery.DcFilter,
rackFilter: cfg.Discovery.RackFilter,
}

go hostSource.run(cfg.Discovery.Sleep)
}

return s, nil
}

pool.Close()
return nil, ErrNoConnectionsStarted
return NewSession(*cfg)
}

var (
Expand Down
58 changes: 53 additions & 5 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,61 @@ type Session struct {
}

// NewSession wraps an existing Node.
func NewSession(p ConnectionPool, c ClusterConfig) *Session {
session := &Session{Pool: p, cons: c.Consistency, prefetch: 0.25, cfg: c}
func NewSession(cfg ClusterConfig) (*Session, error) {
//Check that hosts in the ClusterConfig is not empty
if len(cfg.Hosts) < 1 {
return nil, ErrNoHosts
}

maxStreams := 128
if cfg.ProtoVersion > protoVersion2 {
maxStreams = 32768
}

if cfg.NumStreams <= 0 || cfg.NumStreams > maxStreams {
cfg.NumStreams = maxStreams
}

pool, err := cfg.ConnPoolType(&cfg)
if err != nil {
return nil, err
}

//Adjust the size of the prepared statements cache to match the latest configuration
stmtsLRU.Lock()
initStmtsLRU(cfg.MaxPreparedStmts)
stmtsLRU.Unlock()

s := &Session{
Pool: pool,
cons: cfg.Consistency,
prefetch: 0.25,
cfg: cfg,
}

//See if there are any connections in the pool
if pool.Size() > 0 {
s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo)

s.SetConsistency(cfg.Consistency)
s.SetPageSize(cfg.PageSize)

if cfg.DiscoverHosts {
hostSource := &ringDescriber{
session: s,
dcFilter: cfg.Discovery.DcFilter,
rackFilter: cfg.Discovery.RackFilter,
}

go hostSource.run(cfg.Discovery.Sleep)
}

return s, nil
}

// create the query info cache
session.routingKeyInfoCache.lru = lru.New(c.MaxRoutingKeyInfo)
s.Close()

return session
return nil, ErrNoConnectionsStarted
}

// SetConsistency sets the default consistency level for this session. This
Expand Down
22 changes: 15 additions & 7 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@ import (

func TestSessionAPI(t *testing.T) {

cfg := ClusterConfig{}
pool, err := NewSimplePool(&cfg)
cfg := &ClusterConfig{}
pool, err := NewSimplePool(cfg)
if err != nil {
t.Fatal(err)
}

s := NewSession(pool, cfg)
s := &Session{
Pool: pool,
cfg: *cfg,
cons: Quorum,
}

defer s.Close()

s.SetConsistency(All)
Expand Down Expand Up @@ -154,14 +159,17 @@ func TestQueryShouldPrepare(t *testing.T) {

func TestBatchBasicAPI(t *testing.T) {

cfg := ClusterConfig{}
cfg.RetryPolicy = &SimpleRetryPolicy{NumRetries: 2}
pool, err := NewSimplePool(&cfg)
cfg := &ClusterConfig{RetryPolicy: &SimpleRetryPolicy{NumRetries: 2}}
pool, err := NewSimplePool(cfg)
if err != nil {
t.Fatal(err)
}

s := NewSession(pool, cfg)
s := &Session{
Pool: pool,
cfg: *cfg,
cons: Quorum,
}
defer s.Close()

b := s.NewBatch(UnloggedBatch)
Expand Down

0 comments on commit 6703dc3

Please sign in to comment.