-
Notifications
You must be signed in to change notification settings - Fork 625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Speculative query execution #1178
Conversation
Fixed the locking mechanism and Travis is now happy. |
9d0efa3
to
95e91c2
Compare
Restructured the work. Now it's in 4 specific commits, each with self-descriptive commit message and can be looked at separately. |
query_executor.go
Outdated
|
||
// if it's not the first attempt, pause | ||
if specExecCounter > 0 { | ||
<-time.After(sp.Delay()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reuse a timer instead of using time.After. This should also have a way to cancel, for now just doing a select on the ctx.Done() should be enough. Though that would need exposing from the query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
query_executor.go
Outdated
hostIter := q.policy.Pick(qry) | ||
sp := qry.speculativeExecutionPolicy() | ||
|
||
results := make(chan queryResponse) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buffer this to the number of possible executions so goroutines dont leak
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that's actually a great idea.
query_executor.go
Outdated
RetryableQuery | ||
} | ||
|
||
type queryExecutor struct { | ||
pool *policyConnPool | ||
policy HostSelectionPolicy | ||
specWG sync.WaitGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need one of these per query, define it local to the execution loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do.
query_executor.go
Outdated
res queryResponse | ||
specExecCounter int | ||
) | ||
for selectedHost := hostIter(); selectedHost != nil && specExecCounter < sp.Executions(); selectedHost = hostIter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I like this logic very much. I was thinking this would look something more like
timer := time.NewTimer(speculation.After())
defer timer.Stop()
ctx, cancel := context.WithCancel(q.Context())
defer cancel()
result := make(chan *Iter, speculation.Attempts())
for i := 0; i < speculation.Attempts(); i++ {
go queryAttempt(ctx, hostIter, result)
select {
case <-timer.C:
timer.Reset(speculation.After())
case result := <-result:
return result
case ctx.Done():
return ctx.Err()
}
}
And have all retrying logic handled within queryAttempt
somehow.
This may require adjustments to the hostIter interface to say if we have more
hosts to try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should query speculation use the same host iterator for each query or a new one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering that each speculative "execution" is defined per node, I think we need the same host iterator in order to go only once over all the endpoints. We don't want a different query to try a node that has already been tried.
query_executor.go
Outdated
} | ||
} | ||
|
||
func (q *queryExecutor) executeNormalQuery(qry ExecutableQuery) (*Iter, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not keen on how this requires having 2 implementations of a very similar function to execute a query if it is speculatable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I totally agree. Just haven't found an elegant way to do it yet.
* Metrics are now split into: hostMetrics - for a list of metrics queryMetrics - for a map and a locker * Added functions to perform locked metrics updates/reads * Locking is private for the metrics only, so should have no performance effects. Signed-off-by: Alex Lourie <alex@instaclustr.com>
* Define the speculative policy * Add NonSpeculative policy * Add SimpleSpeculative policy Signed-off-by: Alex Lourie <alex@instaclustr.com>
Signed-off-by: Alex Lourie <alex@instaclustr.com>
* Refactor executeQuery to execute main code in a separate goroutine * Handle speculative/non-speculative cases separately * Add TestSpeculativeExecution test Signed-off-by: Alex Lourie <alex@instaclustr.com>
9d75db9
to
3890f2e
Compare
* Make one code path for all executions * Simplify the results handling * Update the tests Signed-off-by: Alex Lourie <alex@instaclustr.com>
3890f2e
to
5630214
Compare
@Zariel I think it's much better, more clear and kinda elegant now. Would love your feedback. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like this implementation and just have a couple nits.
session.go
Outdated
attempts := 0 | ||
for _, metric := range q.metrics { | ||
q.metrics.l.Lock() | ||
defer q.metrics.l.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd handle this the same way with the unlock as you did in the other functions below – and not take the defer hit and unlock after the for loop. There's no other code path executed so the defer isn't necessary.
session.go
Outdated
hostMetrics, exists := q.metrics[host.ConnectAddress().String()] | ||
func (q *Query) getHostMetrics(host *HostInfo) *hostMetrics { | ||
q.metrics.l.Lock() | ||
defer q.metrics.l.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment below.
policies.go
Outdated
type NonSpeculativeExecution struct{} | ||
|
||
func (sp NonSpeculativeExecution) Attempts() int { return 0 } | ||
func (sp NonSpeculativeExecution) Delay() time.Duration { return 1 } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the delay 1 in the non-speculative execution policy? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's just a delay :-). I can return 0 as well, but it's irrelevant, as the delay is not used in non-speculative execution flow. But, it is used in the Ticker
creation, so must be positive, so picked 1.
query_executor.go
Outdated
|
||
// Exit if the query was successful | ||
// or no retry policy defined or retry attempts were reached | ||
if rt == nil || iter.err == nil || !rt.Attempt(qry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reorder the statements according to the comment (which is also the logical order to check, even though, yes, it doesn't really matter – a nit is a nit is a nit):
if iter.err == nil || rt == nil || !rt.Attempt(qry) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, came back after I read your comment over in the other PR. I think we should take another stab at pulling #1151, #1164, and #1162 together into a coherent whole so that we can put speculative on top. I'm not harking on your implementation in here – it reads really fluidly. What it doesn't do right now is handle the case of a non-idempotent query being executed speculatively and we could do that if #1162 were implemented by explicitly stopping retries in those cases like write errors. On the other hand… one has to specifically turn on speculative execution on a query, right? 🤔 I may be overthinking this in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@annismckenzie Yea, I was thinking to myself whether you're getting back to this :-) So indeed, it is possible that a non-idempotent query will be retried, but definitely not in a speculative execution. Speculative
in this case means that there are multiple executions running at the same time, and the first one to get a response wins. We take care of that in the entrance to the executeQuery
by forcing the speculative execution policy to NonSpeculative if the query is not idempotent.
Back to the issue. I'm all for fixing the retries to ack the idempotence, and definitely would be for taking another stab at error handling. I'm not overly sure these depend on this work, they can be layered in any order. It can be pretty decent in size, especially if we include the retries/idempotence handling and actually wrapping errors in their own types. Do you need a hand with that one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would really, really, really appreciate that! 🤝 I'll text you via other channels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
query_executor.go
Outdated
continue | ||
default: | ||
// Undefined? | ||
results <- queryResponse{iter: iter, err: iter.err} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had the same problem over in #1151. Ideally, we'd just want to panic here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
query_executor.go
Outdated
} | ||
|
||
func (q *queryExecutor) run(qry ExecutableQuery, specWG *sync.WaitGroup, results chan queryResponse, stop chan struct{}) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop this blank line
02e3140
to
e66f4b7
Compare
* Metric lock improvements * Style cleanups Signed-off-by: Alex Lourie <alex@instaclustr.com>
e66f4b7
to
20bd1a2
Compare
Signed-off-by: Alex Lourie <alex@instaclustr.com>
…iveExecution_1083_new
5593eca
to
b0cfada
Compare
Signed-off-by: Alex Lourie <alex@instaclustr.com>
b0cfada
to
813b288
Compare
@Zariel mind having another look? It's been laying around for awhile... Thanks! |
go q.run(qry, &specWG, results, stop) | ||
case <-qry.GetContext().Done(): | ||
// not starting additional executions | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that return happens here and specWG.Wait()
waits forever if the context is done because the for
hasn't done all the loops? Could it with bad luck (hitting the continue
s in run
) block the whole executeQuery
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it won't. We actually already have at least 1 execution added to the waiting group in line 55 (the main execution). Also, the run
will only execute on N nodes and then finish, no blocking should be happening there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I see
func (t *testRetryPolicy) GetRetryType(err error) RetryType { | ||
return Retry | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test looks good. It'd be great if we could have:
- A test where the context gets done
- A test with delay zero (I'd like this to run with
-race
),which I believe is valid use case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll have a go at adding that.
fixes #1083
This is an attempt to implement speculative query execution. I think it's more or less covers the feature, but I clearly could have missed some things.
This is still a bit of WIP, so any feedback is welcome @Zariel ( and @annismckenzie if you have the time).
Thanks!