From 55a38e15c5db050d4bbe96061a31cbe2d661fa3e Mon Sep 17 00:00:00 2001 From: Martin Sucha <2007393+martin-sucha@users.noreply.github.com> Date: Mon, 8 Jul 2019 16:50:58 +0200 Subject: [PATCH] Add attempt number to ObservedQuery (#1324) This will allow the observer to know whether the attempt was original query or retry. --- policies_test.go | 6 ++---- session.go | 30 ++++++++++++++++++++++++------ session_test.go | 23 +++++++++++++++-------- 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/policies_test.go b/policies_test.go index 85c6f4ee4..06114ba15 100644 --- a/policies_test.go +++ b/policies_test.go @@ -263,9 +263,8 @@ func TestSimpleRetryPolicy(t *testing.T) { {5, false}, } - q.metrics = &queryMetrics{m: make(map[string]*hostMetrics)} for _, c := range cases { - q.metrics.m["127.0.0.1"] = &hostMetrics{Attempts: c.attempts} + q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}}) if c.allow && !rt.Attempt(q) { t.Fatalf("should allow retry after %d attempts", c.attempts) } @@ -348,9 +347,8 @@ func TestDowngradingConsistencyRetryPolicy(t *testing.T) { {16, false, reu1, Retry}, } - q.metrics = &queryMetrics{m: make(map[string]*hostMetrics)} for _, c := range cases { - q.metrics.m["127.0.0.1"] = &hostMetrics{Attempts: c.attempts} + q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}}) if c.retryType != rt.GetRetryType(c.err) { t.Fatalf("retry type should be %v", c.retryType) } diff --git a/session.go b/session.go index 943750aca..50adc873e 100644 --- a/session.go +++ b/session.go @@ -664,6 +664,18 @@ type hostMetrics struct { type queryMetrics struct { l sync.RWMutex m map[string]*hostMetrics + // totalAttempts is total number of attempts. + // Equal to sum of all hostMetrics' Attempts. + totalAttempts int +} + +// preFilledQueryMetrics initializes new queryMetrics based on per-host supplied data. +func preFilledQueryMetrics(m map[string]*hostMetrics) *queryMetrics { + qm := &queryMetrics{m: m} + for _, hm := range qm.m { + qm.totalAttempts += hm.Attempts + } + return qm } // hostMetricsLocked gets or creates host metrics for given host. @@ -690,19 +702,20 @@ func (qm *queryMetrics) hostMetricsLocked(host *HostInfo) *hostMetrics { // attempts returns the number of times the query was executed. func (qm *queryMetrics) attempts() int { qm.l.Lock() - var attempts int - for _, metric := range qm.m { - attempts += metric.Attempts - } + attempts := qm.totalAttempts qm.l.Unlock() return attempts } -func (qm *queryMetrics) addAttempts(i int, host *HostInfo) { +// addAttempts adds given number of attempts and returns previous total attempts. +func (qm *queryMetrics) addAttempts(i int, host *HostInfo) int { qm.l.Lock() hostMetric := qm.hostMetricsLocked(host) hostMetric.Attempts += i + attempts := qm.totalAttempts + qm.totalAttempts += i qm.l.Unlock() + return attempts } func (qm *queryMetrics) latency() int64 { @@ -915,7 +928,7 @@ func (q *Query) execute(ctx context.Context, conn *Conn) *Iter { } func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { - q.AddAttempts(1, host) + attempt := q.metrics.addAttempts(1, host) q.AddLatency(end.Sub(start).Nanoseconds(), host) if q.observer != nil { @@ -928,6 +941,7 @@ func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host Host: host, Metrics: q.metrics.hostMetrics(host), Err: iter.err, + Attempt: attempt, }) } } @@ -1889,6 +1903,10 @@ type ObservedQuery struct { // Err is the error in the query. // It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error Err error + + // Attempt is the index of attempt at executing this query. + // The first attempt is number zero and any retries have non-zero attempt number. + Attempt int } // QueryObserver is the interface implemented by query observers / stat collectors. diff --git a/session_test.go b/session_test.go index dc0761eec..cbc64f90b 100644 --- a/session_test.go +++ b/session_test.go @@ -5,6 +5,7 @@ package gocql import ( "context" "fmt" + "net" "testing" ) @@ -99,18 +100,15 @@ func (f funcQueryObserver) ObserveQuery(ctx context.Context, o ObservedQuery) { func TestQueryBasicAPI(t *testing.T) { qry := &Query{} - // Initialise metrics map - qry.metrics = &queryMetrics{m: make(map[string]*hostMetrics)} - // Initiate host ip := "127.0.0.1" - qry.metrics.m[ip] = &hostMetrics{Attempts: 0, TotalLatency: 0} + qry.metrics = preFilledQueryMetrics(map[string]*hostMetrics{ip: {Attempts: 0, TotalLatency: 0}}) if qry.Latency() != 0 { t.Fatalf("expected Query.Latency() to return 0, got %v", qry.Latency()) } - qry.metrics.m[ip] = &hostMetrics{Attempts: 2, TotalLatency: 4} + qry.metrics = preFilledQueryMetrics(map[string]*hostMetrics{ip: {Attempts: 2, TotalLatency: 4}}) if qry.Attempts() != 2 { t.Fatalf("expected Query.Attempts() to return 2, got %v", qry.Attempts()) } @@ -118,6 +116,11 @@ func TestQueryBasicAPI(t *testing.T) { t.Fatalf("expected Query.Latency() to return 2, got %v", qry.Latency()) } + qry.AddAttempts(2, &HostInfo{hostname: ip, connectAddress: net.ParseIP(ip), port: 9042}) + if qry.Attempts() != 4 { + t.Fatalf("expected Query.Attempts() to return 4, got %v", qry.Attempts()) + } + qry.Consistency(All) if qry.GetConsistency() != All { t.Fatalf("expected Query.GetConsistency to return 'All', got '%s'", qry.GetConsistency()) @@ -202,21 +205,25 @@ func TestBatchBasicAPI(t *testing.T) { t.Fatalf("expected batch.Type to be '%v', got '%v'", LoggedBatch, b.Type) } - b.metrics = &queryMetrics{m: make(map[string]*hostMetrics)} ip := "127.0.0.1" // Test attempts - b.metrics.m[ip] = &hostMetrics{Attempts: 1} + b.metrics = preFilledQueryMetrics(map[string]*hostMetrics{ip: {Attempts: 1}}) if b.Attempts() != 1 { t.Fatalf("expected batch.Attempts() to return %v, got %v", 1, b.Attempts()) } + b.AddAttempts(2, &HostInfo{hostname: ip, connectAddress: net.ParseIP(ip), port: 9042}) + if b.Attempts() != 3 { + t.Fatalf("expected batch.Attempts() to return %v, got %v", 3, b.Attempts()) + } + // Test latency if b.Latency() != 0 { t.Fatalf("expected batch.Latency() to be 0, got %v", b.Latency()) } - b.metrics.m[ip] = &hostMetrics{Attempts: 1, TotalLatency: 4} + b.metrics = preFilledQueryMetrics(map[string]*hostMetrics{ip: {Attempts: 1, TotalLatency: 4}}) if b.Latency() != 4 { t.Fatalf("expected batch.Latency() to return %v, got %v", 4, b.Latency()) }