Skip to content

Commit

Permalink
Query metric per host 1155 (apache#1156)
Browse files Browse the repository at this point in the history
* Store attempts and latencies in a host-based map

Signed-off-by: Alex Lourie <alex@instaclustr.com>

* Update ObservedQuery for the new metrics.

Signed-off-by: Alex Lourie <alex@instaclustr.com>

* Tests improvements

* Now it's possible to spin multi-node test clusters
* Batch tests moved from legacy to session.<> calls.

Signed-off-by: Alex Lourie <alex@instaclustr.com>

* Added test for query with metrics and QueryObserver

Signed-off-by: Alex Lourie <alex@instaclustr.com>

* Test on Travis with newer C* version and add self to contributors

Signed-off-by: Alex Lourie <alex@instaclustr.com>

* Initiating values in the same way

Signed-off-by: Alex Lourie <alex@instaclustr.com>
  • Loading branch information
alourie authored and Zariel committed Aug 31, 2018
1 parent 78e324a commit 4d29881
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 57 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ env:
AUTH=false
- CASS=3.11.3
AUTH=false
- CASS=3.11.3
AUTH=false

go:
- "1.9"
Expand Down
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,4 @@ Chang Liu <changliu.it@gmail.com>
Ingo Oeser <nightlyone@gmail.com>
Luke Hines <lukehines@protonmail.com>
Jacob Greenleaf <jacob@jacobgreenleaf.com>
Alex Lourie <alex@instaclustr.com>; <djay.il@gmail.com>
10 changes: 5 additions & 5 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func TestBatch(t *testing.T) {
t.Fatal("create table:", err)
}

batch := NewBatch(LoggedBatch)
batch := session.NewBatch(LoggedBatch)
for i := 0; i < 100; i++ {
batch.Query(`INSERT INTO batch_table (id) VALUES (?)`, i)
}
Expand Down Expand Up @@ -623,9 +623,9 @@ func TestUnpreparedBatch(t *testing.T) {

var batch *Batch
if session.cfg.ProtoVersion == 2 {
batch = NewBatch(CounterBatch)
batch = session.NewBatch(CounterBatch)
} else {
batch = NewBatch(UnloggedBatch)
batch = session.NewBatch(UnloggedBatch)
}

for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -664,7 +664,7 @@ func TestBatchLimit(t *testing.T) {
t.Fatal("create table:", err)
}

batch := NewBatch(LoggedBatch)
batch := session.NewBatch(LoggedBatch)
for i := 0; i < 65537; i++ {
batch.Query(`INSERT INTO batch_table2 (id) VALUES (?)`, i)
}
Expand Down Expand Up @@ -1869,7 +1869,7 @@ func TestBatchObserve(t *testing.T) {

var observedBatch *observation

batch := NewBatch(LoggedBatch)
batch := session.NewBatch(LoggedBatch)
batch.Observer(funcBatchObserver(func(ctx context.Context, o ObservedBatch) {
if observedBatch != nil {
t.Fatal("batch observe called more than once")
Expand Down
121 changes: 101 additions & 20 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func TestJoinHostPort(t *testing.T) {
}
}

func testCluster(addr string, proto protoVersion) *ClusterConfig {
cluster := NewCluster(addr)
func testCluster(proto protoVersion, addresses ...string) *ClusterConfig {
cluster := NewCluster(addresses...)
cluster.ProtoVersion = int(proto)
cluster.disableControlConn = true
return cluster
Expand All @@ -67,7 +67,7 @@ func TestSimple(t *testing.T) {
srv := NewTestServer(t, defaultProto, context.Background())
defer srv.Stop()

cluster := testCluster(srv.Address, defaultProto)
cluster := testCluster(defaultProto, srv.Address)
db, err := cluster.CreateSession()
if err != nil {
t.Fatalf("0x%x: NewCluster: %v", defaultProto, err)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestSSLSimpleNoClientCert(t *testing.T) {
}

func createTestSslCluster(addr string, proto protoVersion, useClientCert bool) *ClusterConfig {
cluster := testCluster(addr, proto)
cluster := testCluster(proto, addr)
sslOpts := &SslOptions{
CaPath: "testdata/pki/ca.crt",
EnableHostVerification: false,
Expand All @@ -128,7 +128,7 @@ func TestClosed(t *testing.T) {
srv := NewTestServer(t, defaultProto, context.Background())
defer srv.Stop()

session, err := newTestSession(srv.Address, defaultProto)
session, err := newTestSession(defaultProto, srv.Address)
if err != nil {
t.Fatalf("0x%x: NewCluster: %v", defaultProto, err)
}
Expand All @@ -140,8 +140,8 @@ func TestClosed(t *testing.T) {
}
}

func newTestSession(addr string, proto protoVersion) (*Session, error) {
return testCluster(addr, proto).CreateSession()
func newTestSession(proto protoVersion, addresses ...string) (*Session, error) {
return testCluster(proto, addresses...).CreateSession()
}

func TestDNSLookupConnected(t *testing.T) {
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestTimeout(t *testing.T) {
srv := NewTestServer(t, defaultProto, ctx)
defer srv.Stop()

db, err := newTestSession(srv.Address, defaultProto)
db, err := newTestSession(defaultProto, srv.Address)
if err != nil {
t.Fatalf("NewCluster: %v", err)
}
Expand All @@ -282,6 +282,24 @@ func TestTimeout(t *testing.T) {
wg.Wait()
}

type testQueryObserver struct {
metrics map[string]*queryMetrics
verbose bool
}

func (o *testQueryObserver) ObserveQuery(ctx context.Context, q ObservedQuery) {
host := q.Host.ConnectAddress().String()
o.metrics[host] = q.Metrics
if o.verbose {
Logger.Printf("Observed query %q. Returned %v rows, took %v on host %q with %v attempts and total latency %v. Error: %q\n",
q.Statement, q.Rows, q.End.Sub(q.Start), host, q.Metrics.Attempts, q.Metrics.TotalLatency, q.Err)
}
}

func (o *testQueryObserver) GetMetrics(host *HostInfo) *queryMetrics {
return o.metrics[host.ConnectAddress().String()]
}

// TestQueryRetry will test to make sure that gocql will execute
// the exact amount of retry queries designated by the user.
func TestQueryRetry(t *testing.T) {
Expand All @@ -291,7 +309,7 @@ func TestQueryRetry(t *testing.T) {
srv := NewTestServer(t, defaultProto, ctx)
defer srv.Stop()

db, err := newTestSession(srv.Address, defaultProto)
db, err := newTestSession(defaultProto, srv.Address)
if err != nil {
t.Fatalf("NewCluster: %v", err)
}
Expand Down Expand Up @@ -325,13 +343,72 @@ func TestQueryRetry(t *testing.T) {
}
}

func TestQueryMultinodeWithMetrics(t *testing.T) {

// Build a 3 node cluster to test host metric mapping
var nodes []*TestServer
var addresses = []string{
"127.0.0.1",
"127.0.0.2",
"127.0.0.3",
}
// Can do with 1 context for all servers
ctx := context.Background()
for _, ip := range addresses {
srv := NewTestServerWithAddress(ip+":0", t, defaultProto, ctx)
defer srv.Stop()
nodes = append(nodes, srv)
}

db, err := newTestSession(defaultProto, nodes[0].Address, nodes[1].Address, nodes[2].Address)
if err != nil {
t.Fatalf("NewCluster: %v", err)
}
defer db.Close()

// 1 retry per host
rt := &SimpleRetryPolicy{NumRetries: 3}
observer := &testQueryObserver{metrics: make(map[string]*queryMetrics), verbose: false}
qry := db.Query("kill").RetryPolicy(rt).Observer(observer)
if err := qry.Exec(); err == nil {
t.Fatalf("expected error")
}

for i, ip := range addresses {
host := &HostInfo{connectAddress: net.ParseIP(ip)}
observedMetrics := observer.GetMetrics(host)

requests := int(atomic.LoadInt64(&nodes[i].nKillReq))
hostAttempts := qry.metrics[ip].Attempts
if requests != hostAttempts {
t.Fatalf("expected requests %v to match query attempts %v", requests, hostAttempts)
}

if hostAttempts != observedMetrics.Attempts {
t.Fatalf("expected observed attempts %v to match query attempts %v on host %v", observedMetrics.Attempts, hostAttempts, ip)
}

hostLatency := qry.metrics[ip].TotalLatency
observedLatency := observedMetrics.TotalLatency
if hostLatency != observedLatency {
t.Fatalf("expected observed latency %v to match query latency %v on host %v", observedLatency, hostLatency, ip)
}
}
// the query will only be attempted once, but is being retried
attempts := qry.Attempts()
if attempts != rt.NumRetries {
t.Fatalf("failed to retry the query %v time(s). Query executed %v times", rt.NumRetries, attempts)
}

}

func TestStreams_Protocol1(t *testing.T) {
srv := NewTestServer(t, protoVersion1, context.Background())
defer srv.Stop()

// TODO: these are more like session tests and should instead operate
// on a single Conn
cluster := testCluster(srv.Address, protoVersion1)
cluster := testCluster(protoVersion1, srv.Address)
cluster.NumConns = 1
cluster.ProtoVersion = 1

Expand Down Expand Up @@ -363,7 +440,7 @@ func TestStreams_Protocol3(t *testing.T) {

// TODO: these are more like session tests and should instead operate
// on a single Conn
cluster := testCluster(srv.Address, protoVersion3)
cluster := testCluster(protoVersion3, srv.Address)
cluster.NumConns = 1
cluster.ProtoVersion = 3

Expand Down Expand Up @@ -439,7 +516,7 @@ func TestQueryTimeout(t *testing.T) {
srv := NewTestServer(t, defaultProto, context.Background())
defer srv.Stop()

cluster := testCluster(srv.Address, defaultProto)
cluster := testCluster(defaultProto, srv.Address)
// Set the timeout arbitrarily low so that the query hits the timeout in a
// timely manner.
cluster.Timeout = 1 * time.Millisecond
Expand Down Expand Up @@ -476,7 +553,7 @@ func BenchmarkSingleConn(b *testing.B) {
srv := NewTestServer(b, 3, context.Background())
defer srv.Stop()

cluster := testCluster(srv.Address, 3)
cluster := testCluster(3, srv.Address)
// Set the timeout arbitrarily low so that the query hits the timeout in a
// timely manner.
cluster.Timeout = 500 * time.Millisecond
Expand Down Expand Up @@ -507,7 +584,7 @@ func TestQueryTimeoutReuseStream(t *testing.T) {
srv := NewTestServer(t, defaultProto, context.Background())
defer srv.Stop()

cluster := testCluster(srv.Address, defaultProto)
cluster := testCluster(defaultProto, srv.Address)
// Set the timeout arbitrarily low so that the query hits the timeout in a
// timely manner.
cluster.Timeout = 1 * time.Millisecond
Expand All @@ -531,7 +608,7 @@ func TestQueryTimeoutClose(t *testing.T) {
srv := NewTestServer(t, defaultProto, context.Background())
defer srv.Stop()

cluster := testCluster(srv.Address, defaultProto)
cluster := testCluster(defaultProto, srv.Address)
// Set the timeout arbitrarily low so that the query hits the timeout in a
// timely manner.
cluster.Timeout = 1000 * time.Millisecond
Expand Down Expand Up @@ -625,7 +702,7 @@ func TestContext_Timeout(t *testing.T) {
srv := NewTestServer(t, defaultProto, context.Background())
defer srv.Stop()

cluster := testCluster(srv.Address, defaultProto)
cluster := testCluster(defaultProto, srv.Address)
cluster.Timeout = 5 * time.Second
db, err := cluster.CreateSession()
if err != nil {
Expand Down Expand Up @@ -663,7 +740,7 @@ func TestFrameHeaderObserver(t *testing.T) {
srv := NewTestServer(t, defaultProto, context.Background())
defer srv.Stop()

cluster := testCluster(srv.Address, defaultProto)
cluster := testCluster(defaultProto, srv.Address)
cluster.NumConns = 1
observer := &recordingFrameHeaderObserver{t: t}
cluster.FrameHeaderObserver = observer
Expand Down Expand Up @@ -695,8 +772,8 @@ func TestFrameHeaderObserver(t *testing.T) {
}
}

func NewTestServer(t testing.TB, protocol uint8, ctx context.Context) *TestServer {
laddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
func NewTestServerWithAddress(addr string, t testing.TB, protocol uint8, ctx context.Context) *TestServer {
laddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -728,6 +805,10 @@ func NewTestServer(t testing.TB, protocol uint8, ctx context.Context) *TestServe
return srv
}

func NewTestServer(t testing.TB, protocol uint8, ctx context.Context) *TestServer {
return NewTestServerWithAddress("127.0.0.1:0", t, protocol, ctx)
}

func NewSSLTestServer(t testing.TB, protocol uint8, ctx context.Context) *TestServer {
pem, err := ioutil.ReadFile("testdata/pki/ca.crt")
certPool := x509.NewCertPool()
Expand Down Expand Up @@ -788,7 +869,7 @@ type TestServer struct {
}

func (srv *TestServer) session() (*Session, error) {
return testCluster(srv.Address, protoVersion(srv.protocol)).CreateSession()
return testCluster(protoVersion(srv.protocol), srv.Address).CreateSession()
}

func (srv *TestServer) host() *HostInfo {
Expand Down
3 changes: 2 additions & 1 deletion control.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter
Logger.Printf("control: error executing %q: %v\n", statement, iter.err)
}

q.attempts++
metric := q.getHostMetrics(c.getConn().host)
metric.Attempts++
if iter.err == nil || !c.retry.Attempt(q) {
break
}
Expand Down
6 changes: 4 additions & 2 deletions policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,9 @@ func TestSimpleRetryPolicy(t *testing.T) {
{5, false},
}

q.metrics = make(map[string]*queryMetrics)
for _, c := range cases {
q.attempts = c.attempts
q.metrics["127.0.0.1"] = &queryMetrics{Attempts: c.attempts}
if c.allow && !rt.Attempt(q) {
t.Fatalf("should allow retry after %d attempts", c.attempts)
}
Expand Down Expand Up @@ -347,8 +348,9 @@ func TestDowngradingConsistencyRetryPolicy(t *testing.T) {
{16, false, reu1, Retry},
}

q.metrics = make(map[string]*queryMetrics)
for _, c := range cases {
q.attempts = c.attempts
q.metrics["127.0.0.1"] = &queryMetrics{Attempts: c.attempts}
if c.retryType != rt.GetRetryType(c.err) {
t.Fatalf("retry type should be %v", c.retryType)
}
Expand Down
Loading

0 comments on commit 4d29881

Please sign in to comment.