-
Notifications
You must be signed in to change notification settings - Fork 625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Query metric per host 1155 #1156
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning the favor because you reviewed my PR as well – hope that's okay. 😄
session_test.go
Outdated
t.Fatalf("expceted batch.Attempts() to return %v, got %v", 1, b.Attempts()) | ||
b.metrics[ip] = &queryMetric{attempts: 1} | ||
if b.Attempts(host) != 1 { | ||
t.Fatalf("expceted batch.Attempts() to return %v, got %v", 1, b.Attempts(host)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix expceted
spelling while you're here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure :-)
session.go
Outdated
s.mu.RUnlock() | ||
} | ||
|
||
func initializeMetrics(s *Session) map[string]*queryMetric { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why this is a lone function and not func (q *Query) initializeMetrics()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the main reason is that Query and Batch initialisers are the same an don't really need to work on a query object.
On the other hand, acking the comment on the initialiser itself, I will probably get rid of it completely as it would no longer make sense if the nodes are added later.
session.go
Outdated
@@ -658,6 +658,11 @@ func (s *Session) connect(host *HostInfo, errorHandler ConnErrorHandler) (*Conn, | |||
return s.dial(host, s.connCfg, errorHandler) | |||
} | |||
|
|||
type queryMetric struct { | |||
attempts int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make those uint64
because there's no good reason why it should ever be negative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed.
session.go
Outdated
defer s.pool.mu.RUnlock() | ||
|
||
// Range over all defined IPs and prebuild metrics map | ||
for _, hostConn := range s.pool.hostConnPools { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a possibility that a host is added to the pool after this is done? I'm pretty sure that can happen if you add a new node to the cluster and it's gossiped around the cluster through the control connection. In that case you might panic in the map accesses below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as mentioned above, will probably remove this completely.
conn_test.go
Outdated
func NewTestServer(t testing.TB, protocol uint8, ctx context.Context) *TestServer { | ||
laddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") | ||
func NewTestServerWithAddress(addr string, t testing.TB, protocol uint8, ctx context.Context) *TestServer { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop this empty line
conn_test.go
Outdated
@@ -721,6 +805,11 @@ func NewTestServer(t testing.TB, protocol uint8, ctx context.Context) *TestServe | |||
go srv.serve() | |||
|
|||
return srv | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop this empty line
session.go
Outdated
q.attempts++ | ||
q.totalLatency += end.Sub(start).Nanoseconds() | ||
// TODO: track latencies per host and things as well instead of just total | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop this empty line
session.go
Outdated
b.attempts++ | ||
b.totalLatency += end.Sub(start).Nanoseconds() | ||
// TODO: track latencies per host and things as well instead of just total | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop this empty line
session.go
Outdated
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop this empty line
@annismckenzie more than OK, I really appreciate it! |
@annismckenzie I've updated the PR, would love if you could have a look. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good progress on making it less complex! Still have a couple nits.
session.go
Outdated
s.mu.RUnlock() | ||
} | ||
|
||
func (q *Query) getHostMetrics(host *HostInfo) *queryMetrics { | ||
_, exists := q.metrics[host.connectAddress.String()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only look it up once in this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering that this call is used for both a lookup and initiating a zero value, I really can't think of a more "efficient" or better way to do it, except splitting it into 2 functions, 1 to initiate the map and the other is to fetch. Personally, don't think it makes it all that better, but is that what you're thinking of?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant something like this:
func (q *Query) getHostMetrics(host *HostInfo) *queryMetrics {
metrics, exists := q.metrics[host.connectAddress.String()]
if !exists {
// if the host is not in the map, it means it's been accessed for the first time
metrics = &queryMetrics{attempts: 0, totalLatency: 0}
q.metrics[host.connectAddress.String()] = metrics
}
return metrics
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, thought about this one too, but didn't have a good opinion on it... would you think it looks better? If so, can easily change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it does look better. 🙈
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
session.go
Outdated
if b.attempts > 0 { | ||
return b.totalLatency / int64(b.attempts) | ||
func (b *Batch) Latency(host *HostInfo) int64 { | ||
hostMetric := b.getHostMetrics(host) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doesn't the Batch
have a mutex while the Query
still does? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They both don't have a mutex. The only mutexes in Query (but also in Batches) are the ones embedded in Session, where there are 2: one for the session itself and the second is for the hostPool access. Am I missing something? :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't know what I was seeing – weird. Disregard. 😬
@alourie needs a rebase now |
@Zariel @annismckenzie okay....So I've rebased with the last master, cleaned up the code a bit, removed yet unnecessary locks. Also found a small bug in the query_executor, which I also fixed, added a separate test case for that for future reference and finally updated the testRetryPolicy to be a bit more flexible. Travis is now happy, would love your feedback/approval ;-) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a couple questions, see my comments.
query_executor.go
Outdated
return p.GetRetryType(err), nil | ||
} | ||
|
||
// p.Attempt is false, should not retry anything | ||
return p.GetRetryType(err), err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The correct fix would be to change this to return Rethrow, err
. Then the change below can be dropped again as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, that's also a problem. If the retryPolicy is implemented similarly to the new testRetryPolicy, then when those attempts are done it returns RetryNextHost
, notRethrow. If you ignore that, then you're throwing an error instead of trying the next host.
That's why I ended up accepting the need to call it here, but check it in the queryExecutor. I don't like both solutions, but at least the current one seems to be working correctly.
All above is wrong. That is indeed the correct fix, and I misunderstood how RetryPolicy works, so hence the fix above. So will implement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind if I fix this in my branch because I'm working on #1161 and it touches the same code path again in the query executor?
session.go
Outdated
@@ -826,8 +846,8 @@ func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host | |||
End: end, | |||
Rows: iter.numRows, | |||
Host: host, | |||
Metrics: q.metrics[host.ConnectAddress().String()], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass hostMetric
as it contains the correct information (and is also not used currently in this function anyway?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, don't know what I was thinking...will fix
session.go
Outdated
@@ -713,14 +729,18 @@ func (q Query) String() string { | |||
} | |||
|
|||
//Attempts returns the number of times the query was executed. | |||
func (q *Query) Attempts() int { | |||
return q.attempts | |||
func (q *Query) Attempts(host *HostInfo) uint64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main gripe I have with this approach is that this a) break backwards compatibility in a major way and b) now produces a different behavior even with the default SimpleRetryPolicy
. The metrics are tracked by host and default retry type of the SimpleRetryPolicy
is to RetryNextHost
which means the attempts are now also per host. The max retries are therefor also (that's not correct) not honored because on each successive retry (on the next host) the attempts for that will be 0 and it will exhaust the host pool and fail with a different error than before. Mind you, I'm not harking on the general direction or the feature (it is very useful) but maybe leave the current signatures alone and keep track of a summed version of attempts and latencies (as it is right now) and make the per host latencies and attempts available to the retry policy through some other means so going forward one can create an advanced retry policy that takes those into account? 🤔number of hosts * max retries - (number of hosts - 1)
(don't ask, the gist: it fills
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm...now I'm kinda confused.
So, you're right, the current queryExecution
only checks that the total accumulated attempts across all hosts have not surpassed the number set in the retry policy, and this approach changes that. I was wrong in my understanding of what it does.
So...I think I understand it now. What I will probably do is introduce a way to find the total count by summing the map values; also will see how to keep signatures the same.
Thanks for the review!
Totally, go ahead. I planned to look at #1161 if you can't ;-)
On Fri., 24 Aug. 2018, 07:28 Daniel Lohse, ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In query_executor.go
<#1156 (comment)>:
> return p.GetRetryType(err), nil
}
+
+ // p.Attempt is false, should not retry anything
return p.GetRetryType(err), err
Would you mind if I fix this in my branch because I'm working on #1161
<#1161> and it touches the same code
path again in the query executor?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#1156 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AADdBxw7B_ZiZh7gmNB4zAIlCw-sqi8yks5uTyV2gaJpZM4WBvE0>
.
--
Best Regards,
Alex Lourie
|
Nah, already deep in the weeds on #1161. I'll count on your 👀 though. 😬 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went through it again and restoring the public API was the right call (don't want people running away because of high number of breaking changes) – I do have some thoughts. 😬
conn_test.go
Outdated
|
||
func (o *testQueryObserver) ObserveQuery(ctx context.Context, q ObservedQuery) { | ||
Logger.Printf("Observed query %q. Returned %v rows, took %v on host %q. Error: %q\n", q.Statement, q.Rows, q.End.Sub(q.Start), q.Host.ConnectAddress().String(), q.Err) | ||
if o.metrics == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd drop this because it's noisy. It's a test so initialize metrics
when creating the observer below.
conn_test.go
Outdated
Logger.Printf("Observed query %q. Returned %v rows, took %v on host %q. Error: %q\n", q.Statement, q.Rows, q.End.Sub(q.Start), host, q.Err) | ||
} | ||
|
||
// Uncomment and add verbosity? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop and move the missing log entries q.Metrics.attempts
and q.Metrics.totalLatency
into the log message above.
control.go
Outdated
@@ -453,7 +453,11 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter | |||
Logger.Printf("control: error executing %q: %v\n", statement, iter.err) | |||
} | |||
|
|||
q.attempts++ | |||
host := c.getConn().host |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collapse this down to:
c.session.mu.Lock()
q.getHostMetrics(c.getConn().host).attempts++
c.session.mu.Unlock()
It's just a lot of variables for no purpose. You can leave the host :=
line but I'd definitely collapse the attempt increase because it reads better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this whole business of increasing the attempts
in here is wrong and it needs to go. The q
uery constructed above doesn't have a retry policy (and rightly so) but it will nevertheless be running through the whole query execution chain so it will also run through the query executor and end up calling attempt()
on the query below which will handle recording the metrics. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case you can just drop all of those lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As there's only one instance of the query running at any one time (it's blocking and wrapped in a for loop) a lock wouldn't be necessary anyway but yeah, dropping the whole code block is better I guess. 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this one follows the regular query execution. It seems we're constructing and executing the statement directly on conn, and not via the reqular query mechanisms. The retry policy is not constructed, which I reckon means it would be set to SimpleRetryPolicy{numAtt: 3}
...so we still need to count attempts, but probably not locking.... :-)
conn_test.go
Outdated
// q.Statement, q.Rows, q.End.Sub(q.Start), host, q.Metrics.attempts, q.Metrics.totalLatency, q.Err) | ||
} | ||
|
||
func (o *testQueryObserver) GetMetrics(host *HostInfo) *queryMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for an exported function in a test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed
conn_test.go
Outdated
@@ -57,13 +58,17 @@ func TestJoinHostPort(t *testing.T) { | |||
} | |||
} | |||
|
|||
func testCluster(addr string, proto protoVersion) *ClusterConfig { | |||
cluster := NewCluster(addr) | |||
func testMultiNodeCluster(addresses []string, proto protoVersion) *ClusterConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I'll need this too. 👍
Err: iter.err, | ||
Attempt: b.attempts, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're good with my suggestions above, you may want to restore this field and pass b.totalAttempts
. On the other hand, query observers could be fine with calculating it from hostMetrics
. Your call.
@@ -826,8 +859,8 @@ func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host | |||
End: end, | |||
Rows: iter.numRows, | |||
Host: host, | |||
Metrics: hostMetrics, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope it's okay that the query observer will access the fields in hostMetrics
while they could be changed – you're protecting write access above but that mutex is unlocked in line 852. Could that race? 🤔 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty sure it's okay as it's serialized.
session.go
Outdated
s.mu.RUnlock() | ||
} | ||
|
||
func (q *Query) getHostMetrics(host *HostInfo) *queryMetrics { | ||
q.session.mu.Lock() | ||
defer q.session.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As there's no way you're returning from this function early, I'd drop the defer and move the mutex unlock down. It's pretty clear so there's no need for the defer overhead, especially because this could be called a sizable number of times while a query is executing and the retry policy consulted. Come to think of it, making this available to the retry policy isn't implemented yet, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, not implemented, and I'm not sure retryPolicy needs to deal with metrics at all. But considering the Attempt() gets the whole query, it may, in fact, use the metrics if relevant for the retry policy.
policies_test.go
Outdated
@@ -263,8 +263,12 @@ func TestSimpleRetryPolicy(t *testing.T) { | |||
{5, false}, | |||
} | |||
|
|||
// initiate metrics map |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can do away with this comment as well as the ones in lines 269 and 354.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:-)
session.go
Outdated
} | ||
s.mu.RUnlock() | ||
return batch | ||
} | ||
|
||
func (b *Batch) getHostMetrics(host *HostInfo) *queryMetrics { | ||
b.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same suggestion I had above with changing this machinery applies to the Batch
as well. Shame that it's duplicated. This screams separate manager object that holds query attempt metrics as well as latencies and total attempts. It would also include the mutex. Sorry for bringing it up. 😇
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I totally agree :-) I've almost implemented it on Friday, just was too sleepy to make sure the tests pass. Hopefully it will resolve all the gripes with the current implementation.
conn_test.go
Outdated
@@ -57,13 +58,17 @@ func TestJoinHostPort(t *testing.T) { | |||
} | |||
} | |||
|
|||
func testCluster(addr string, proto protoVersion) *ClusterConfig { | |||
cluster := NewCluster(addr) | |||
func testMultiNodeCluster(addresses []string, proto protoVersion) *ClusterConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Define this like
func testMultiNodeCluster(proto protoVersion, addresses ...string) *ClusterConfig
That way callers dont need to wrap a single addr in a slice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wasn't sure that it's ok to change the order, but will do now :-)
session.go
Outdated
@@ -714,13 +732,26 @@ func (q Query) String() string { | |||
|
|||
//Attempts returns the number of times the query was executed. | |||
func (q *Query) Attempts() int { | |||
return q.attempts | |||
q.session.mu.Lock() | |||
defer q.session.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we now locking the session mutex around the place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change
session.go
Outdated
s.mu.RUnlock() | ||
} | ||
|
||
func (q *Query) getHostMetrics(host *HostInfo) *queryMetrics { | ||
q.session.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this lock protecting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metrics map but it's unnecessary as per query there's only ever one attempt running at any given time so retries are serialized and there's no need for locking this structure. If we add speculative retries (@alourie said he'd like to work on that) we'll have to construct a wait group (or some other container for keeping track of parallel query attempts) and the metrics will have to be updated once at the end (first successful result, the other attempts will be canceled), making this approach valid still. I'm pretty sure we can drop it but we should run stress (I recently learned about that from someone 🏅) against the tests to make sure it's safe. But I obviously don't know the codebase like you do, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@annismckenzie You're right, I am working on speculative queries executions, and indeed I use wait groups to keep track of the parallel query executions. The lock here is protecting from the parallel access of the attempts number in the query from different goroutines ....but mind you that I've built the speculative stuff a week before I started this work, and since then I've discovered a few holes in my understanding of how retryPolicies and retries in general work. So much could change and I might remove the locks after all.
integration.sh
Outdated
|
||
ccm clear | ||
ccm start | ||
sleep 1s | ||
|
||
go test -tags "ccm gocql_debug" -timeout=5m -race $args | ||
go test -tags "ccm gocql_debug" -timeout=15m -race $args |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unrelated change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
of course
@alourie this is a -1 from due to the lock required on the session, I don't see why this should require locking the session from down inside the query/control connection. The locking is non obvious and will add a burden to maintenance because now when the code changes the locking needs to be correctly move too. As well as being non obvious it will likely incur overall query performance degradation due to lock contention. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went through it once more while keeping @Zariel's valid concerns in mind. Maybe we can do away with the locks, see my comments.
control.go
Outdated
@@ -453,7 +453,11 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter | |||
Logger.Printf("control: error executing %q: %v\n", statement, iter.err) | |||
} | |||
|
|||
q.attempts++ | |||
host := c.getConn().host |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this whole business of increasing the attempts
in here is wrong and it needs to go. The q
uery constructed above doesn't have a retry policy (and rightly so) but it will nevertheless be running through the whole query execution chain so it will also run through the query executor and end up calling attempt()
on the query below which will handle recording the metrics. Am I missing something?
control.go
Outdated
@@ -453,7 +453,11 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter | |||
Logger.Printf("control: error executing %q: %v\n", statement, iter.err) | |||
} | |||
|
|||
q.attempts++ | |||
host := c.getConn().host |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case you can just drop all of those lines.
control.go
Outdated
@@ -453,7 +453,11 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter | |||
Logger.Printf("control: error executing %q: %v\n", statement, iter.err) | |||
} | |||
|
|||
q.attempts++ | |||
host := c.getConn().host |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As there's only one instance of the query running at any one time (it's blocking and wrapped in a for loop) a lock wouldn't be necessary anyway but yeah, dropping the whole code block is better I guess. 😅
query_executor.go
Outdated
@@ -48,7 +48,8 @@ func (q *queryExecutor) checkRetryPolicy(rq ExecutableQuery, err error) (RetryTy | |||
if p.Attempt(rq) { | |||
return p.GetRetryType(err), nil | |||
} | |||
return p.GetRetryType(err), err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A rebase against the current master will make this go away as the PR has been merged already.
session.go
Outdated
@@ -658,6 +658,11 @@ func (s *Session) connect(host *HostInfo, errorHandler ConnErrorHandler) (*Conn, | |||
return s.dial(host, s.connCfg, errorHandler) | |||
} | |||
|
|||
type queryMetrics struct { | |||
attempts int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're passing this to the query observer which can't access unexported fields – you'll have to at least export those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting point. Should we have an integration suite that's external to the main package? This way we'd catch issues like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That might be a bit much – one just has to remember that observers are being used with functions provided by the user. 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe :-)
session.go
Outdated
s.mu.RUnlock() | ||
} | ||
|
||
func (q *Query) getHostMetrics(host *HostInfo) *queryMetrics { | ||
q.session.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metrics map but it's unnecessary as per query there's only ever one attempt running at any given time so retries are serialized and there's no need for locking this structure. If we add speculative retries (@alourie said he'd like to work on that) we'll have to construct a wait group (or some other container for keeping track of parallel query attempts) and the metrics will have to be updated once at the end (first successful result, the other attempts will be canceled), making this approach valid still. I'm pretty sure we can drop it but we should run stress (I recently learned about that from someone 🏅) against the tests to make sure it's safe. But I obviously don't know the codebase like you do, what do you think?
session.go
Outdated
hostMetrics := q.getHostMetrics(host) | ||
q.session.mu.Lock() | ||
hostMetrics.attempts++ | ||
hostMetrics.totalLatency += end.Sub(start).Nanoseconds() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And that lock should not be necessary if my reasoning above is correct.
@@ -826,8 +859,8 @@ func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host | |||
End: end, | |||
Rows: iter.numRows, | |||
Host: host, | |||
Metrics: hostMetrics, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty sure it's okay as it's serialized.
If it requires locking for concurrent metric updating then it should use much finer grained locking, per host/per query, or it should just atomics. |
@Zariel @annismckenzie so I've cleared up all the things. Now not using locks at all, as indeed things run sequentially anyway. Will deal with locking when necessary in speculative query executions work. Additionally:
|
Could you redo the changes into 4 or 5 logical commits? The last 3 are pretty generic and don't really say what they contain. 😬 |
@annismckenzie do you mean the whole thing or just the last 3? I can squash it all (or just the last ones), so that it is all just 1 commit. Also, many of the commits are just draft work, you probably have no interest in those anyway. |
Well, I see @Zariel usually squashes but that squash merge commit still links to this MR and I'd like it to be comprehensible. I really did mean to just do
I'd find that quite valuable and the squash commit merge is also nice and short. |
@annismckenzie uhm, you realize that what you're asking is quite bit of a manual work up with git's history, right? ;-) I don't mind doing it, it's just it will require force push, and while github will manage that fine, some comments/content may disappear. Is that ok with you? |
Yup, exactly what I'm asking. You basically did 3 implementations here and while it's sometimes valuable to see the evolution of a feature through time that isn't the case here. We should strive to make the implementation you chose clear (it also helps with issues being reported to point to a specific commit that introduced it). |
Signed-off-by: Alex Lourie <alex@instaclustr.com>
Signed-off-by: Alex Lourie <alex@instaclustr.com>
* Now it's possible to spin multi-node test clusters * Batch tests moved from legacy to session.<> calls. Signed-off-by: Alex Lourie <alex@instaclustr.com>
Signed-off-by: Alex Lourie <alex@instaclustr.com>
Signed-off-by: Alex Lourie <alex@instaclustr.com>
0776f52
to
9446847
Compare
@annismckenzie Done. Hope it is more clear this way. |
Very much so! 🎉 |
yay! Thanks @annismckenzie |
session.go
Outdated
if q.attempts > 0 { | ||
return q.totalLatency / int64(q.attempts) | ||
attempts := 0 | ||
var latency int64 = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use latency := 0 or var attempt int
dont mix both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Signed-off-by: Alex Lourie <alex@instaclustr.com>
hostMetrics, exists := b.metrics[host.ConnectAddress().String()] | ||
if !exists { | ||
// if the host is not in the map, it means it's been accessed for the first time | ||
hostMetrics = &queryMetrics{Attempts: 0, TotalLatency: 0} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: go will initialize these values to 0 so you don't need to
Attempt to solve #1155
This ended up being a bit bigger than initially expected.
Nevertheless, the majority of work was to update the tests to fetch the correct host they're doing the work for and sending it over as a parameter to other functions. Additionally, it should be somewhat easier to add new metrics to the query execution.
The tests seem to pass successfully now.
@Zariel would love your feedback on this.