Skip to content

Commit

Permalink
Add attempt number to ObservedQuery (apache#1324)
Browse files Browse the repository at this point in the history
This will allow the observer to know whether the attempt was
original query or retry.
  • Loading branch information
martin-sucha authored and Zariel committed Jul 8, 2019
1 parent d01d4ad commit 55a38e1
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 18 deletions.
6 changes: 2 additions & 4 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,8 @@ func TestSimpleRetryPolicy(t *testing.T) {
{5, false},
}

q.metrics = &queryMetrics{m: make(map[string]*hostMetrics)}
for _, c := range cases {
q.metrics.m["127.0.0.1"] = &hostMetrics{Attempts: c.attempts}
q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
if c.allow && !rt.Attempt(q) {
t.Fatalf("should allow retry after %d attempts", c.attempts)
}
Expand Down Expand Up @@ -348,9 +347,8 @@ func TestDowngradingConsistencyRetryPolicy(t *testing.T) {
{16, false, reu1, Retry},
}

q.metrics = &queryMetrics{m: make(map[string]*hostMetrics)}
for _, c := range cases {
q.metrics.m["127.0.0.1"] = &hostMetrics{Attempts: c.attempts}
q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
if c.retryType != rt.GetRetryType(c.err) {
t.Fatalf("retry type should be %v", c.retryType)
}
Expand Down
30 changes: 24 additions & 6 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,18 @@ type hostMetrics struct {
type queryMetrics struct {
l sync.RWMutex
m map[string]*hostMetrics
// totalAttempts is total number of attempts.
// Equal to sum of all hostMetrics' Attempts.
totalAttempts int
}

// preFilledQueryMetrics initializes new queryMetrics based on per-host supplied data.
func preFilledQueryMetrics(m map[string]*hostMetrics) *queryMetrics {
qm := &queryMetrics{m: m}
for _, hm := range qm.m {
qm.totalAttempts += hm.Attempts
}
return qm
}

// hostMetricsLocked gets or creates host metrics for given host.
Expand All @@ -690,19 +702,20 @@ func (qm *queryMetrics) hostMetricsLocked(host *HostInfo) *hostMetrics {
// attempts returns the number of times the query was executed.
func (qm *queryMetrics) attempts() int {
qm.l.Lock()
var attempts int
for _, metric := range qm.m {
attempts += metric.Attempts
}
attempts := qm.totalAttempts
qm.l.Unlock()
return attempts
}

func (qm *queryMetrics) addAttempts(i int, host *HostInfo) {
// addAttempts adds given number of attempts and returns previous total attempts.
func (qm *queryMetrics) addAttempts(i int, host *HostInfo) int {
qm.l.Lock()
hostMetric := qm.hostMetricsLocked(host)
hostMetric.Attempts += i
attempts := qm.totalAttempts
qm.totalAttempts += i
qm.l.Unlock()
return attempts
}

func (qm *queryMetrics) latency() int64 {
Expand Down Expand Up @@ -915,7 +928,7 @@ func (q *Query) execute(ctx context.Context, conn *Conn) *Iter {
}

func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
q.AddAttempts(1, host)
attempt := q.metrics.addAttempts(1, host)
q.AddLatency(end.Sub(start).Nanoseconds(), host)

if q.observer != nil {
Expand All @@ -928,6 +941,7 @@ func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host
Host: host,
Metrics: q.metrics.hostMetrics(host),
Err: iter.err,
Attempt: attempt,
})
}
}
Expand Down Expand Up @@ -1889,6 +1903,10 @@ type ObservedQuery struct {
// Err is the error in the query.
// It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error
Err error

// Attempt is the index of attempt at executing this query.
// The first attempt is number zero and any retries have non-zero attempt number.
Attempt int
}

// QueryObserver is the interface implemented by query observers / stat collectors.
Expand Down
23 changes: 15 additions & 8 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package gocql
import (
"context"
"fmt"
"net"
"testing"
)

Expand Down Expand Up @@ -99,25 +100,27 @@ func (f funcQueryObserver) ObserveQuery(ctx context.Context, o ObservedQuery) {
func TestQueryBasicAPI(t *testing.T) {
qry := &Query{}

// Initialise metrics map
qry.metrics = &queryMetrics{m: make(map[string]*hostMetrics)}

// Initiate host
ip := "127.0.0.1"

qry.metrics.m[ip] = &hostMetrics{Attempts: 0, TotalLatency: 0}
qry.metrics = preFilledQueryMetrics(map[string]*hostMetrics{ip: {Attempts: 0, TotalLatency: 0}})
if qry.Latency() != 0 {
t.Fatalf("expected Query.Latency() to return 0, got %v", qry.Latency())
}

qry.metrics.m[ip] = &hostMetrics{Attempts: 2, TotalLatency: 4}
qry.metrics = preFilledQueryMetrics(map[string]*hostMetrics{ip: {Attempts: 2, TotalLatency: 4}})
if qry.Attempts() != 2 {
t.Fatalf("expected Query.Attempts() to return 2, got %v", qry.Attempts())
}
if qry.Latency() != 2 {
t.Fatalf("expected Query.Latency() to return 2, got %v", qry.Latency())
}

qry.AddAttempts(2, &HostInfo{hostname: ip, connectAddress: net.ParseIP(ip), port: 9042})
if qry.Attempts() != 4 {
t.Fatalf("expected Query.Attempts() to return 4, got %v", qry.Attempts())
}

qry.Consistency(All)
if qry.GetConsistency() != All {
t.Fatalf("expected Query.GetConsistency to return 'All', got '%s'", qry.GetConsistency())
Expand Down Expand Up @@ -202,21 +205,25 @@ func TestBatchBasicAPI(t *testing.T) {
t.Fatalf("expected batch.Type to be '%v', got '%v'", LoggedBatch, b.Type)
}

b.metrics = &queryMetrics{m: make(map[string]*hostMetrics)}
ip := "127.0.0.1"

// Test attempts
b.metrics.m[ip] = &hostMetrics{Attempts: 1}
b.metrics = preFilledQueryMetrics(map[string]*hostMetrics{ip: {Attempts: 1}})
if b.Attempts() != 1 {
t.Fatalf("expected batch.Attempts() to return %v, got %v", 1, b.Attempts())
}

b.AddAttempts(2, &HostInfo{hostname: ip, connectAddress: net.ParseIP(ip), port: 9042})
if b.Attempts() != 3 {
t.Fatalf("expected batch.Attempts() to return %v, got %v", 3, b.Attempts())
}

// Test latency
if b.Latency() != 0 {
t.Fatalf("expected batch.Latency() to be 0, got %v", b.Latency())
}

b.metrics.m[ip] = &hostMetrics{Attempts: 1, TotalLatency: 4}
b.metrics = preFilledQueryMetrics(map[string]*hostMetrics{ip: {Attempts: 1, TotalLatency: 4}})
if b.Latency() != 4 {
t.Fatalf("expected batch.Latency() to return %v, got %v", 4, b.Latency())
}
Expand Down

0 comments on commit 55a38e1

Please sign in to comment.