Skip to content

Commit

Permalink
Implement speculative execution
Browse files Browse the repository at this point in the history
* Refactor executeQuery to execute main code in a separate goroutine
* Handle speculative/non-speculative cases separately
* Add TestSpeculativeExecution test

Signed-off-by: Alex Lourie <alex@instaclustr.com>
  • Loading branch information
Alex Lourie committed Sep 10, 2018
1 parent 25a65b3 commit 95e91c2
Show file tree
Hide file tree
Showing 2 changed files with 241 additions and 41 deletions.
89 changes: 87 additions & 2 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
"os"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -44,8 +46,8 @@ func TestApprove(t *testing.T) {

func TestJoinHostPort(t *testing.T) {
tests := map[string]string{
"127.0.0.1:0": JoinHostPort("127.0.0.1", 0),
"127.0.0.1:1": JoinHostPort("127.0.0.1:1", 9142),
"127.0.0.1:0": JoinHostPort("127.0.0.1", 0),
"127.0.0.1:1": JoinHostPort("127.0.0.1:1", 9142),
"[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:0": JoinHostPort("2001:0db8:85a3:0000:0000:8a2e:0370:7334", 0),
"[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:1": JoinHostPort("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:1", 9142),
}
Expand Down Expand Up @@ -436,6 +438,75 @@ func TestQueryMultinodeWithMetrics(t *testing.T) {

}

type testRetryPolicy struct {
NumRetries int
}

func (t *testRetryPolicy) Attempt(qry RetryableQuery) bool {
return qry.Attempts() <= t.NumRetries
}
func (t *testRetryPolicy) GetRetryType(err error) RetryType {
return Retry
}

func TestSpeculativeExecution(t *testing.T) {
log := &testLogger{}
Logger = log
defer func() {
Logger = &defaultLogger{}
os.Stdout.WriteString(log.String())
}()

// Build a 3 node cluster to test host metric mapping
var nodes []*TestServer
var addresses = []string{
"127.0.0.1",
"127.0.0.2",
"127.0.0.3",
}
// Can do with 1 context for all servers
ctx := context.Background()
for _, ip := range addresses {
srv := NewTestServerWithAddress(ip+":0", t, defaultProto, ctx)
defer srv.Stop()
nodes = append(nodes, srv)
}

db, err := newTestSession(defaultProto, nodes[0].Address, nodes[1].Address, nodes[2].Address)
if err != nil {
t.Fatalf("NewCluster: %v", err)
}
defer db.Close()

// Create a test policy with retries
rt := &testRetryPolicy{NumRetries: 7}
// Create a test Speculative policy with number of executions less than nodes
sp := &SimpleSpeculativeExecution{Attempts: 2, Pause: 200 * time.Millisecond}

// Build the query
qry := db.Query("speculative").RetryPolicy(rt).SetSpeculativeExecutionPolicy(sp).Idempotent(true)

// Execute the query and close, check that it doesn't error out
if err := qry.Exec(); err != nil {
t.Errorf("The query failed with '%v'!\n", err)
}
requests1 := atomic.LoadInt64(&nodes[0].nKillReq)
requests2 := atomic.LoadInt64(&nodes[1].nKillReq)
requests3 := atomic.LoadInt64(&nodes[2].nKillReq)

// Spec Attempts == 2, so expecting to see only 2 nodes attempted
if requests1 != 0 && requests2 != 0 && requests3 != 0 {
t.Error("error: all 3 nodes were attempted, should have been only 2")
}

// "speculative" query will succeed on one arbitrary node after 3 attempts, so
// expecting to see 3 (on successful node) + not more than 2 (on failed node) == 5
// retries in total
if requests1+requests2+requests3 > 5 {
t.Errorf("error: expected to see 5 attempts maximum, got %v\n", requests1+requests2+requests3)
}
}

func TestStreams_Protocol1(t *testing.T) {
srv := NewTestServer(t, protoVersion1, context.Background())
defer srv.Stop()
Expand Down Expand Up @@ -1075,6 +1146,20 @@ func (srv *TestServer) process(f *framer) {
}
}()
return
case "speculative":
atomic.AddInt64(&srv.nKillReq, 1)
if atomic.LoadInt64(&srv.nKillReq) > 2 {
f.writeHeader(0, opResult, head.stream)
f.writeInt(resultKindVoid)
f.writeString("speculative query success on the node " + srv.Address)
} else {
f.writeHeader(0, opError, head.stream)
f.writeInt(0x1001)
f.writeString("speculative error")
rand.Seed(time.Now().UnixNano())
sleepFor := time.Duration(time.Millisecond * time.Duration((rand.Intn(50) + 25)))
<-time.After(sleepFor)
}
default:
f.writeHeader(0, opResult, head.stream)
f.writeInt(resultKindVoid)
Expand Down
193 changes: 154 additions & 39 deletions query_executor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gocql

import (
"sync"
"time"
)

Expand All @@ -19,6 +20,7 @@ type ExecutableQuery interface {
type queryExecutor struct {
pool *policyConnPool
policy HostSelectionPolicy
specWG sync.WaitGroup
}

func (q *queryExecutor) attemptQuery(qry ExecutableQuery, conn *Conn) *Iter {
Expand All @@ -32,12 +34,28 @@ func (q *queryExecutor) attemptQuery(qry ExecutableQuery, conn *Conn) *Iter {
}

func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
rt := qry.retryPolicy()
hostIter := q.policy.Pick(qry)

var iter *Iter
for hostResponse := hostIter(); hostResponse != nil; hostResponse = hostIter() {
host := hostResponse.Info()
// check whether the query execution set to speculative
// to make sure, we check if the policy is NonSpeculative or marked as non-idempotent, if it is
// run normal, otherwise, runs speculative.
_, nonSpeculativeExecution := qry.speculativeExecutionPolicy().(NonSpeculativeExecution)
if nonSpeculativeExecution || !qry.IsIdempotent() {
return q.executeNormalQuery(qry)
} else {
return q.executeSpeculativeQuery(qry)
}
}

func (q *queryExecutor) executeNormalQuery(qry ExecutableQuery) (*Iter, error) {

var res queryResponse

results := make(chan queryResponse)
defer close(results)

hostIter := q.policy.Pick(qry)
for selectedHost := hostIter(); selectedHost != nil; selectedHost = hostIter() {
host := selectedHost.Info()
if host == nil || !host.IsUp() {
continue
}
Expand All @@ -52,51 +70,148 @@ func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
continue
}

iter = q.attemptQuery(qry, conn)
// Update host
hostResponse.Mark(iter.err)
// store the result
q.specWG.Add(1)
go q.runAndStore(qry, conn, selectedHost, results, nil)

// fetch from the channel whatever is there
res = <-results

if rt == nil {
iter.host = host
break
if res.retryType == RetryNextHost {
continue
}

switch rt.GetRetryType(iter.err) {
case Retry:
for rt.Attempt(qry) {
iter = q.attemptQuery(qry, conn)
hostResponse.Mark(iter.err)
if iter.err == nil {
iter.host = host
return iter, nil
}
if rt.GetRetryType(iter.err) != Retry {
break
}
}
case Rethrow:
return nil, iter.err
case Ignore:
return iter, nil
case RetryNextHost:
default:
return res.iter, res.err

}

if res.iter == nil {
return nil, ErrNoConnections
}

return res.iter, nil
}

func (q *queryExecutor) executeSpeculativeQuery(qry ExecutableQuery) (*Iter, error) {
hostIter := q.policy.Pick(qry)
sp := qry.speculativeExecutionPolicy()

results := make(chan queryResponse)

stopExecutions := make(chan struct{})
defer close(stopExecutions)

var (
res queryResponse
specExecCounter int
)
for selectedHost := hostIter(); selectedHost != nil && specExecCounter < sp.Executions(); selectedHost = hostIter() {
host := selectedHost.Info()
if host == nil || !host.IsUp() {
continue
}

// Exit for loop if the query was successful
if iter.err == nil {
iter.host = host
return iter, nil
pool, ok := q.pool.getPool(host)
if !ok {
continue
}

conn := pool.Pick()
if conn == nil {
continue
}

// if it's not the first attempt, pause
if specExecCounter > 0 {
<-time.After(sp.Delay())
}

// push the results into a channel
q.specWG.Add(1)
go q.runAndStore(qry, conn, selectedHost, results, stopExecutions)
specExecCounter += 1
}

// defer cleaning
go func() {
q.specWG.Wait()
close(results)
}()

// check the results
for res := range results {
if res.retryType == RetryNextHost {
continue
}

if !rt.Attempt(qry) {
// What do here? Should we just return an error here?
break
if res.err == nil {
return res.iter, res.err
}
}

if iter == nil {
if res.iter == nil {
return nil, ErrNoConnections
}

return iter, nil
return res.iter, nil
}

func (q *queryExecutor) runAndStore(qry ExecutableQuery, conn *Conn, selectedHost SelectedHost, results chan queryResponse, stopExecutions chan struct{}) {

rt := qry.retryPolicy()
host := selectedHost.Info()

// Run the query
iter := q.attemptQuery(qry, conn)
iter.host = host
// Update host
selectedHost.Mark(iter.err)

// Handle the wait group
defer q.specWG.Done()

// Exit if the query was successful
// or no retry policy defined
if rt == nil || iter.err == nil {
results <- queryResponse{iter: iter}
return
}

// If query is unsuccessful, use RetryPolicy to retry
select {
case <-stopExecutions:
// We're done, stop everyone else
qry.Cancel()
default:
for rt.Attempt(qry) {
switch rt.GetRetryType(iter.err) {
case Retry:
iter = q.attemptQuery(qry, conn)
selectedHost.Mark(iter.err)
if iter.err == nil {
iter.host = host
results <- queryResponse{iter: iter}
return
}
case Rethrow:
results <- queryResponse{err: iter.err}
return
case Ignore:
results <- queryResponse{iter: iter}
return
case RetryNextHost:
results <- queryResponse{retryType: RetryNextHost}
return
default:
results <- queryResponse{iter: iter, err: iter.err}
return
}
}
}
}

type queryResponse struct {
iter *Iter
err error
retryType RetryType
}

0 comments on commit 95e91c2

Please sign in to comment.