-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add async option to Replay() #8892
Conversation
I see that you haven't updated any CHANGELOG files. Would it make sense to do so? |
d043fc8
to
cf6ac19
Compare
cf6ac19
to
59a8c4c
Compare
replayCtx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
go func() { | ||
// merge log poller context and parent context. If either of them are cancelled, abort replay | ||
// This is so that lp.run() can pass a single context to lp.PollAndSaveLogs(). Not used in this | ||
// function since we want to return differentiate between the two, for returning proper error code | ||
select { | ||
case <-lp.ctx.Done(): | ||
cancel() | ||
case <-ctx.Done(): | ||
if !async { | ||
// In the async case, we don't want to cancel the replay after we've returned to the caller (unless the | ||
// log poller is shutting down). This is important for a web controller, since it will always | ||
// cancel the context as soon as the request is complete, which will likely be before the replay is done. | ||
cancel() | ||
} | ||
case <-replayCtx.Done(): | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is extremely difficult to read and seems brittle since it tightly couples implementation details of systems that shouldn't know anything about each other. Why can't the caller own the context lifetime like usual?
Actually, why not simply this?
replayCtx, cancel := context.WithCancel(ctx) | |
defer cancel() | |
go func() { | |
// merge log poller context and parent context. If either of them are cancelled, abort replay | |
// This is so that lp.run() can pass a single context to lp.PollAndSaveLogs(). Not used in this | |
// function since we want to return differentiate between the two, for returning proper error code | |
select { | |
case <-lp.ctx.Done(): | |
cancel() | |
case <-ctx.Done(): | |
if !async { | |
// In the async case, we don't want to cancel the replay after we've returned to the caller (unless the | |
// log poller is shutting down). This is important for a web controller, since it will always | |
// cancel the context as soon as the request is complete, which will likely be before the replay is done. | |
cancel() | |
} | |
case <-replayCtx.Done(): | |
} | |
}() | |
if async { | |
ctx = lp.ctx | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contexts need to be merged for the synchronous case, but you have a good point... I think for the async case it could be something similar to ctx = lp.ctx
, although that would have to happen later not here. Thanks for the suggestion, it's at least in the right direction.
This go routine for merging the contexts doesn't necessarily have to be spawned by this function, it could be moved into lp.run()
. I considered both possibilities and decided this one made slightly more sense, but I'm open to moving it if there's a good reason.
I've tried several simpler ideas to try to avoid merging the contexts, in earlier PR's, kept thinking there must be a better way. Each of them ended up falling short in one way or another, so I closed the PR's. I eventually decided that @connorwstein was right, that without a major re-arch of how the replays work in logpoller, this is the best option that does everything we need. Let's discuss offline if you have other ideas, as it's probably too complex for github.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contexts need to be merged for the synchronous case
Why must they be merged? In what scenario is one lp.ctx
done but not the request ctx
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as it's probably too complex for github.
I disagree. This requires a nuanced discussion alongside the code.
Storing a context on the log poller in the first place is a misuse of the context API, but regardless of which type you store there is no need to merge the signals. The stored context only needs to be wired to things that don't already have one of their own - i.e. background routines literally using context.Background()
. Incoming requests with a context
arg will already be cancelled by the system on shutdown, or will encounter systems that have shut down and return an appropriate error. What difference does it make if we race to cancel the request context from another place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I'm a little confused about why the request context wouldn't already have the server context as a parent to begin with.
Within Google's codebase, where the context package originated, we follow the rule that a context.Context should only be passed around via the call stack.
...
This rule means that at any point in the call stack, there should be exactly one applicable Context, received as a function parameter. When following this pattern, the merge operation never makes sense.While merging context cancellation signals is straightforward, merging context values is not. Contexts can contain trace IDs and other information; which value would we pick when merging two contexts?
// In the async case, we don't want to cancel the replay after we've returned to the caller (unless the | ||
// log poller is shutting down). This is important for a web controller, since it will always | ||
// cancel the context as soon as the request is complete, which will likely be before the replay is done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But context cancellation isn't prevented by avoiding this particular call, because the replayCtx
is derived directly from the original ctx
argument and will automatically be cancelled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! I was wondering why these tests weren't passing. Kept tweaking the tests today thinking it was a problem with them, but you're right this is a bug... in fact, so is the defer cancel()
line, for the same reason. Thanks for looking at it, guess I need to refactor some things.
return ErrReplayAbortedOnShutdown | ||
case <-ctx.Done(): | ||
cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are redundant, given the defer up top.
@@ -31,7 +31,7 @@ import ( | |||
//go:generate mockery --quiet --name LogPoller --output ./mocks/ --case=underscore --structname LogPoller --filename log_poller.go | |||
type LogPoller interface { | |||
services.ServiceCtx | |||
Replay(ctx context.Context, fromBlock int64) error | |||
Replay(ctx context.Context, fromBlock int64, async bool) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you consider a separate method?
Replay(ctx context.Context, fromBlock int64, async bool) error | |
Replay(ctx context.Context, fromBlock int64) error | |
ReplayAsync(fromBlock int64) error |
It seems like it would simplify the context situation. Could it just piggy back on the other method like this?
func (lp *logPoller) ReplayAsync(fromBlock int64) error {
go lp.Replay(lp.ctx, fromBlock)
return nil
}
59a8c4c
to
0481ba1
Compare
SonarQube Quality Gate |
An async option has been added to
lp.Replay()
, so it can be called either synchronously or asynchronously. This allows us to call it asynchronously from the CLI, but synchronously from other services.This also makes synchronous Replay calls slightly more robust, as previously once the replay had started the only way to stop it was by cancelling the caller's context. Now it will stop immediately even if logpoller is shut down before the caller.
For async calls, it can also be cancelled at any time by shutting down the log poller service. It can also be cancelled by the caller after Replay() is called but before the main logpoller runloop receives the replay request. But as soon as the request is received, Replay() will return with no error, indicating that the replay request was successfully received, and the replay has begun. After returning to the caller, the only way to cancel it is by shutting down the logpoller.