Skip to content

Commit

Permalink
prevent duplicate stream usage
Browse files Browse the repository at this point in the history
Remove the atomic waiting variable, it can lead to a case
where a stream is released twice which will cause data races
and unforseen panics.

The case this can happen is that in exec the frame is written
and the response is received before call.waiting is set
from 0 to 1. This is most likely down to the runtime scheduling
the recv() io goroutine before the atomic store happens. The recv()
goroutine sees that call.waiting is 0 and assumes that is has timed
out, releasing the stream. Then the exec() goroutine is scheduled
which sets call.waiting to 1 and then blocks in the select waiting
for the response, which never comes so the timeout option is selected
which then releases the stream again.

Solve this by removing the call.waiting synchronisation which was
not protecting anything as the channels were providing specific
synchronisation points.

Add a stress test benchmark which can reproduce panics that this
patch fixes.

Updates apache#439, apache#435, apache#437, apache#430
  • Loading branch information
Zariel committed Jul 7, 2015
1 parent 1d2c5eb commit 2ed0ffe
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 54 deletions.
9 changes: 6 additions & 3 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
tb.Logf("Created keyspace %s", keyspace)
}

func createSession(tb testing.TB) *Session {
cluster := createCluster()

func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {
// Drop and re-create the keyspace once. Different tests should use their own
// individual tables, but can assume that the table does not exist before.
initOnce.Do(func() {
Expand All @@ -123,6 +121,11 @@ func createSession(tb testing.TB) *Session {
return session
}

func createSession(tb testing.TB) *Session {
cluster := createCluster()
return createSessionFromCluster(cluster, tb)
}

// TestAuthentication verifies that gocql will work with a host configured to only accept authenticated connections
func TestAuthentication(t *testing.T) {

Expand Down
87 changes: 36 additions & 51 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,40 @@ func (c *Conn) authenticateHandshake(authFrame *authenticateFrame) error {
}
}

func (c *Conn) closeWithError(err error) {
if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) {
return
}

if err != nil {
// we should attempt to deliver the error back to the caller if it
// exists
for id := 0; id < len(c.calls); id++ {
req := &c.calls[id]
// we need to send the error to all waiting queries, put the state
// of this conn into not active so that it can not execute any queries.
if err != nil {
select {
case req.resp <- err:
default:
}
}
}
}

// if error was nil then unblock the quit channel
close(c.quit)
c.conn.Close()

if c.started && err != nil {
c.errorHandler.HandleError(c, err, true)
}
}

func (c *Conn) Close() {
c.closeWithError(nil)
}

// Serve starts the stream multiplexer for this connection, which is required
// to execute any queries. This method runs as long as the connection is
// open and is therefore usually called in a separate goroutine.
Expand Down Expand Up @@ -336,13 +370,6 @@ func (c *Conn) recv() error {
}
}

if !atomic.CompareAndSwapInt32(&call.waiting, 1, 0) {
// the waiting thread timed out and is no longer waiting, the stream has
// not yet been readded to the chan so it cant be used again,
c.releaseStream(head.stream)
return nil
}

// we either, return a response to the caller, the caller timedout, or the
// connection has closed. Either way we should never block indefinatly here
select {
Expand All @@ -359,7 +386,6 @@ type callReq struct {
// could use a waitgroup but this allows us to do timeouts on the read/send
resp chan error
framer *framer
waiting int32
timeout chan struct{} // indicates to recv() that a call has timedout
}

Expand All @@ -370,7 +396,7 @@ func (c *Conn) releaseStream(stream int) {

select {
case c.uniq <- stream:
default:
case <-c.quit:
}
}

Expand All @@ -389,21 +415,16 @@ func (c *Conn) exec(req frameWriter, tracer Tracer) (frame, error) {
return nil, ErrConnectionClosed
}

call := &c.calls[stream]
// resp is basically a waiting semaphore protecting the framer
framer := newFramer(c, c, c.compressor, c.version)
call := &c.calls[stream]
call.framer = framer
call.timeout = make(chan struct{})

if tracer != nil {
framer.trace()
}

if !atomic.CompareAndSwapInt32(&call.waiting, 0, 1) {
return nil, errors.New("gocql: stream is busy or closed")
}
defer atomic.StoreInt32(&call.waiting, 0)

err := req.writeFrame(framer, stream)
if err != nil {
return nil, err
Expand Down Expand Up @@ -617,42 +638,6 @@ func (c *Conn) Closed() bool {
return atomic.LoadInt32(&c.closed) == 1
}

func (c *Conn) closeWithError(err error) {
if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) {
return
}

if err != nil {
// we should attempt to deliver the error back to the caller if it
// exists
for id := 0; id < len(c.calls); id++ {
req := &c.calls[id]
// we need to send the error to all waiting queries, put the state
// of this conn into not active so that it can not execute any queries.
atomic.StoreInt32(&req.waiting, -1)

if err != nil {
select {
case req.resp <- err:
default:
}
}
}
}

// if error was nil then unblock the quit channel
close(c.quit)
c.conn.Close()

if c.started && err != nil {
c.errorHandler.HandleError(c, err, true)
}
}

func (c *Conn) Close() {
c.closeWithError(nil)
}

func (c *Conn) Address() string {
return c.addr
}
Expand Down
40 changes: 40 additions & 0 deletions stress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// +build all integration

package gocql

import (
"sync/atomic"

"testing"
)

func BenchmarkConnStress(b *testing.B) {
const workers = 16

cluster := createCluster()
cluster.NumConns = 1
cluster.NumStreams = workers
session := createSessionFromCluster(cluster, b)
defer session.Close()

if err := createTable(session, "CREATE TABLE IF NOT EXISTS conn_stress (id int primary key)"); err != nil {
b.Fatal(err)
}

var seed uint64
writer := func(pb *testing.PB) {
seed := atomic.AddUint64(&seed, 1)
var i uint64 = 0
for pb.Next() {
if err := session.Query("insert into conn_stress (id) values (?)", i*seed).Exec(); err != nil {
b.Error(err)
return
}
i++
}
}

b.SetParallelism(workers)
b.RunParallel(writer)

}

0 comments on commit 2ed0ffe

Please sign in to comment.