Skip to content

Commit

Permalink
Merge pull request apache#531 from Zariel/lazy-streams
Browse files Browse the repository at this point in the history
conn: lazily allocate stream call
  • Loading branch information
Zariel committed Nov 19, 2015
2 parents 640dd4c + 83d38bd commit 7e6c797
Showing 1 changed file with 52 additions and 18 deletions.
70 changes: 52 additions & 18 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,17 @@ var TimeoutLimit int64 = 10
// queries, but users are usually advised to use a more reliable, higher
// level API.
type Conn struct {
conn net.Conn
r *bufio.Reader
timeout time.Duration
cfg *ConnConfig
conn net.Conn
r *bufio.Reader
timeout time.Duration
cfg *ConnConfig
numStreams int

headerBuf []byte

uniq chan int
calls []callReq
mu sync.RWMutex
calls map[int]*callReq

errorHandler ConnErrorHandler
compressor Compressor
Expand Down Expand Up @@ -188,7 +190,7 @@ func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler, sessio
r: bufio.NewReader(conn),
cfg: cfg,
uniq: make(chan int, streams),
calls: make([]callReq, streams),
calls: make(map[int]*callReq),
timeout: cfg.Timeout,
version: uint8(cfg.ProtoVersion),
addr: conn.RemoteAddr().String(),
Expand All @@ -198,6 +200,7 @@ func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler, sessio
headerBuf: make([]byte, headerSize),
quit: make(chan struct{}),
session: session,
numStreams: streams,
}

if cfg.Keepalive > 0 {
Expand All @@ -207,7 +210,6 @@ func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler, sessio
// reserve stream 0 incase cassandra returns an error on it without us sending
// a request.
for i := 1; i < streams; i++ {
c.calls[i].resp = make(chan error)
c.uniq <- i
}

Expand Down Expand Up @@ -340,8 +342,8 @@ func (c *Conn) closeWithError(err error) {
if err != nil {
// we should attempt to deliver the error back to the caller if it
// exists
for id := 0; id < len(c.calls); id++ {
req := &c.calls[id]
c.mu.RLock()
for _, req := range c.calls {
// we need to send the error to all waiting queries, put the state
// of this conn into not active so that it can not execute any queries.
if err != nil {
Expand All @@ -351,6 +353,7 @@ func (c *Conn) closeWithError(err error) {
}
}
}
c.mu.RUnlock()
}

// if error was nil then unblock the quit channel
Expand Down Expand Up @@ -407,7 +410,7 @@ func (c *Conn) recv() error {
return err
}

if head.stream > len(c.calls) {
if head.stream > c.numStreams {
return fmt.Errorf("gocql: frame header stream is beyond call exepected bounds: %d", head.stream)
} else if head.stream == -1 {
// TODO: handle cassandra event frames, we shouldnt get any currently
Expand All @@ -434,8 +437,10 @@ func (c *Conn) recv() error {
}
}

call := &c.calls[head.stream]
if call == nil || call.framer == nil {
c.mu.RLock()
call, ok := c.calls[head.stream]
c.mu.RUnlock()
if call == nil || call.framer == nil || !ok {
log.Printf("gocql: received response for stream which has no handler: header=%v\n", head)
return c.discardFrame(head)
}
Expand Down Expand Up @@ -463,14 +468,22 @@ func (c *Conn) recv() error {

type callReq struct {
// could use a waitgroup but this allows us to do timeouts on the read/send
resp chan error
framer *framer
timeout chan struct{} // indicates to recv() that a call has timedout
resp chan error
framer *framer
timeout chan struct{} // indicates to recv() that a call has timedout
streamID int // current stream in use
}

func (c *Conn) releaseStream(stream int) {
call := &c.calls[stream]
call.framer = nil
c.mu.Lock()
call := c.calls[stream]
if call != nil && stream != call.streamID {
panic(fmt.Sprintf("attempt to release streamID with ivalid stream: %d -> %+v\n", stream, call))
}
delete(c.calls, stream)
c.mu.Unlock()

streamPool.Put(call)

select {
case c.uniq <- stream:
Expand All @@ -484,6 +497,16 @@ func (c *Conn) handleTimeout() {
}
}

var (
streamPool = sync.Pool{
New: func() interface{} {
return &callReq{
resp: make(chan error),
}
},
}
)

func (c *Conn) exec(req frameWriter, tracer Tracer) (*framer, error) {
// TODO: move tracer onto conn
var stream int
Expand All @@ -495,9 +518,20 @@ func (c *Conn) exec(req frameWriter, tracer Tracer) (*framer, error) {

// resp is basically a waiting semaphore protecting the framer
framer := newFramer(c, c, c.compressor, c.version)
call := &c.calls[stream]

c.mu.Lock()
call := c.calls[stream]
if call != nil {
panic(fmt.Sprintf("attempting to use stream already in use: %d -> %+v\n", stream, call))
} else {
call = streamPool.Get().(*callReq)
}
c.calls[stream] = call
c.mu.Unlock()

call.framer = framer
call.timeout = make(chan struct{})
call.streamID = stream

if tracer != nil {
framer.trace()
Expand Down

0 comments on commit 7e6c797

Please sign in to comment.