Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
* Revert "Fix bug in query executor that prevented retries from ceasing (apache#1164)"

This reverts commit 0d4b564.

* Revert "Provide a way to limit per-query-attempt deadlines (apache#1151)"

This reverts commit e48272f.
  • Loading branch information
annismckenzie authored and Zariel committed Aug 27, 2018
1 parent 44a37f4 commit 246df0a
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 168 deletions.
2 changes: 0 additions & 2 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,3 @@ Chang Liu <changliu.it@gmail.com>
Ingo Oeser <nightlyone@gmail.com>
Luke Hines <lukehines@protonmail.com>
Jacob Greenleaf <jacob@jacobgreenleaf.com>
Andreas Jaekle <andreas@jaekle.net>
Daniel Lohse <daniel.lohse@alfatraining.de>
43 changes: 2 additions & 41 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,10 +585,6 @@ type callReq struct {
}

func (c *Conn) exec(ctx context.Context, req frameWriter, tracer Tracer) (*framer, error) {
if ctx != nil && ctx.Err() != nil {
return nil, ctx.Err()
}

// TODO: move tracer onto conn
stream, ok := c.streams.GetStream()
if !ok {
Expand Down Expand Up @@ -794,41 +790,6 @@ func marshalQueryValue(typ TypeInfo, value interface{}, dst *queryValues) error
}

func (c *Conn) executeQuery(qry *Query) *Iter {
ctx := qry.context
if rt, ok := qry.rt.(RetryPolicyWithAttemptTimeout); ok && rt.AttemptTimeout() > 0 {
if ctx == nil {
ctx = context.Background()
}
var cancel func()
ctx, cancel = context.WithCancel(ctx)
defer cancel()
if qry.attemptTimeoutTimer == nil {
qry.attemptTimeoutTimer = time.NewTimer(0)
<-qry.attemptTimeoutTimer.C
} else {
if !qry.attemptTimeoutTimer.Stop() {
select {
case <-qry.attemptTimeoutTimer.C:
default:
}
}
}

qry.attemptTimeoutTimer.Reset(rt.AttemptTimeout())
timeoutCh := qry.attemptTimeoutTimer.C

go func() {
select {
case <-ctx.Done():
qry.attemptTimeoutTimer.Stop()
break
case <-timeoutCh:
break
}
cancel()
}()
}

params := queryParams{
consistency: qry.cons,
}
Expand All @@ -853,7 +814,7 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
if qry.shouldPrepare() {
// Prepare all DML queries. Other queries can not be prepared.
var err error
info, err = c.prepareStatement(ctx, qry.stmt, qry.trace)
info, err = c.prepareStatement(qry.context, qry.stmt, qry.trace)
if err != nil {
return &Iter{err: err}
}
Expand Down Expand Up @@ -902,7 +863,7 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
}
}

framer, err := c.exec(ctx, frame, qry.trace)
framer, err := c.exec(qry.context, frame, qry.trace)
if err != nil {
return &Iter{err: err}
}
Expand Down
61 changes: 11 additions & 50 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"io"
"io/ioutil"
"net"
"os"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -283,46 +282,12 @@ func TestTimeout(t *testing.T) {
wg.Wait()
}

type testRetryPolicy struct {
numRetries int // maximum number of times to retry a query
attemptTimeout time.Duration
t *testing.T
}

// Attempt tells gocql to attempt the query again based on query.Attempts being less
// than the NumRetries defined in the policy.
func (s *testRetryPolicy) Attempt(q RetryableQuery) bool {
return q.Attempts() <= s.numRetries
}

func (s *testRetryPolicy) GetRetryType(err error) RetryType {
return Retry
}

// AttemptTimeout satisfies the optional RetryPolicyWithAttemptTimeout interface.
func (s *testRetryPolicy) AttemptTimeout() time.Duration {
return s.attemptTimeout
}

type testQueryObserver struct{}

func (o *testQueryObserver) ObserveQuery(ctx context.Context, q ObservedQuery) {
Logger.Printf("Observed query %q. Returned %v rows, took %v on host %q. Error: %q\n", q.Statement, q.Rows, q.End.Sub(q.Start), q.Host.ConnectAddress().String(), q.Err)
}

// TestQueryRetry will test to make sure that gocql will execute
// the exact amount of retry queries designated by the user.
func TestQueryRetry(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

log := &testLogger{}
Logger = log
defer func() {
Logger = &defaultLogger{}
os.Stdout.WriteString(log.String())
}()

srv := NewTestServer(t, defaultProto, ctx)
defer srv.Stop()

Expand All @@ -341,24 +306,22 @@ func TestQueryRetry(t *testing.T) {
}
}()

rt := &testRetryPolicy{numRetries: 2, t: t, attemptTimeout: time.Millisecond * 25}
queryCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*90)
defer cancel()
qry := db.Query("slow").RetryPolicy(rt).Observer(&testQueryObserver{}).WithContext(queryCtx)
rt := &SimpleRetryPolicy{NumRetries: 1}

qry := db.Query("kill").RetryPolicy(rt)
if err := qry.Exec(); err == nil {
t.Fatalf("expected error")
}

// wait for the last slow query to finish
// this prevents the test from flaking because of writing to a connection that's been closed
time.Sleep(100 * time.Millisecond)

numQueries := atomic.LoadUint64(&srv.nQueries)
requests := atomic.LoadInt64(&srv.nKillReq)
attempts := qry.Attempts()
if requests != int64(attempts) {
t.Fatalf("expected requests %v to match query attempts %v", requests, attempts)
}

// the 90ms timeout allows at most 4 retries but the maximum is 2 as per the retry policy
// the number of queries therefore needs to be 3 (initial query + 2 retries)
if numQueries != 3 {
t.Fatalf("Number of queries should be 3 but query executed %v times", numQueries)
// the query will only be attempted once, but is being retried
if requests != int64(rt.NumRetries) {
t.Fatalf("failed to retry the query %v time(s). Query executed %v times", rt.NumRetries, requests-1)
}
}

Expand Down Expand Up @@ -812,7 +775,6 @@ type TestServer struct {
nreq uint64
listen net.Listener
nKillReq int64
nQueries uint64
compressor Compressor

protocol byte
Expand Down Expand Up @@ -928,7 +890,6 @@ func (srv *TestServer) process(f *framer) {
f.writeHeader(0, opSupported, head.stream)
f.writeShort(0)
case opQuery:
atomic.AddUint64(&srv.nQueries, 1)
query := f.readLongString()
first := query
if n := strings.Index(query, " "); n > 0 {
Expand Down
11 changes: 0 additions & 11 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package gocql

import (
"context"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -131,7 +130,6 @@ type RetryableQuery interface {
Attempts() int
SetConsistency(c Consistency)
GetConsistency() Consistency
Context() context.Context
}

type RetryType uint16
Expand All @@ -155,15 +153,6 @@ type RetryPolicy interface {
GetRetryType(error) RetryType
}

// RetryPolicyWithAttemptTimeout is an optional interface retry policies can implement
// in order to control the duration to use before a query attempt is considered
// as a timeout and will potentially be retried.
// It's not part of the RetryPolicy interface to remain backwards compatible.
type RetryPolicyWithAttemptTimeout interface {
AttemptTimeout() time.Duration
RetryPolicy
}

// SimpleRetryPolicy has simple logic for attempting a query a fixed number of times.
//
// See below for examples of usage:
Expand Down
98 changes: 45 additions & 53 deletions query_executor.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
package gocql

import (
"errors"
"time"
)

// ErrUnknownRetryType is returned if the retry policy returns a retry type
// unknown to the query executor.
var ErrUnknownRetryType = errors.New("unknown retry type returned by retry policy")

type ExecutableQuery interface {
execute(conn *Conn) *Iter
attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
Expand All @@ -33,75 +28,72 @@ func (q *queryExecutor) attemptQuery(qry ExecutableQuery, conn *Conn) *Iter {
return iter
}

// checkRetryPolicy is used by the query executor to determine how a failed query should be handled.
// It consults the query context and the query's retry policy.
func (q *queryExecutor) checkRetryPolicy(rq ExecutableQuery, err error) (RetryType, error) {
if ctx := rq.Context(); ctx != nil {
if ctx.Err() != nil {
return Rethrow, ctx.Err()
}
}
p := rq.retryPolicy()
if p == nil {
return Rethrow, err
}
if p.Attempt(rq) {
return p.GetRetryType(err), nil
}
return Rethrow, err
}

func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
rt := qry.retryPolicy()
hostIter := q.policy.Pick(qry)
var iter *Iter

outer:
var iter *Iter
for hostResponse := hostIter(); hostResponse != nil; hostResponse = hostIter() {
host := hostResponse.Info()
if host == nil || !host.IsUp() {
continue
}
hostPool, ok := q.pool.getPool(host)

pool, ok := q.pool.getPool(host)
if !ok {
continue
}

conn := hostPool.Pick()
conn := pool.Pick()
if conn == nil {
continue
}
inner:
for {
iter = q.attemptQuery(qry, conn)
// Update host
hostResponse.Mark(iter.err)

// note host the query was issued against
iter = q.attemptQuery(qry, conn)
// Update host
hostResponse.Mark(iter.err)

if rt == nil {
iter.host = host
break
}

// exit if the query was successful
if iter.err == nil {
return iter, nil
switch rt.GetRetryType(iter.err) {
case Retry:
for rt.Attempt(qry) {
iter = q.attemptQuery(qry, conn)
hostResponse.Mark(iter.err)
if iter.err == nil {
iter.host = host
return iter, nil
}
if rt.GetRetryType(iter.err) != Retry {
break
}
}
case Rethrow:
return nil, iter.err
case Ignore:
return iter, nil
case RetryNextHost:
default:
}

// consult retry policy on how to proceed
var retryType RetryType
retryType, iter.err = q.checkRetryPolicy(qry, iter.err)
switch retryType {
case Retry:
continue inner
case Rethrow:
return nil, iter.err
case Ignore:
return iter, nil
case RetryNextHost:
continue outer
default:
return nil, ErrUnknownRetryType
}
// Exit for loop if the query was successful
if iter.err == nil {
iter.host = host
return iter, nil
}

if !rt.Attempt(qry) {
// What do here? Should we just return an error here?
break
}
}

if iter == nil {
return nil, ErrNoConnections
}

// if we reach this point, there is no host in the pool
return nil, ErrNoConnections
return iter, nil
}
11 changes: 0 additions & 11 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,6 @@ type Query struct {
disableSkipMetadata bool
context context.Context
idempotent bool
attemptTimeoutTimer *time.Timer

disableAutoPage bool
}
Expand Down Expand Up @@ -804,11 +803,6 @@ func (q *Query) WithContext(ctx context.Context) *Query {
return q
}

// Context satisfies the ExecutableQuery interface.
func (q *Query) Context() context.Context {
return q.context
}

func (q *Query) execute(conn *Conn) *Iter {
return conn.executeQuery(q)
}
Expand Down Expand Up @@ -1491,11 +1485,6 @@ func (b *Batch) WithContext(ctx context.Context) *Batch {
return b
}

// Context satisfies the ExecutableQuery interface.
func (b *Batch) Context() context.Context {
return b.context
}

// Size returns the number of batch statements to be executed by the batch operation.
func (b *Batch) Size() int {
return len(b.Entries)
Expand Down

0 comments on commit 246df0a

Please sign in to comment.