-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add request limits #224
Add request limits #224
Conversation
setup a general purpose task queue, refactor functioning of requestmanger -- dispatchign requests to the queue, feeding request data when those queues resume
use a state var to track operational model
make sure not to send cancel when other peer has already sent terminal status
dcff717
to
d5bad97
Compare
requestmanager/executor/executor.go
Outdated
if !isContextErr(err) { | ||
e.manager.SendRequest(requestTask.P, gsmsg.CancelRequest(requestTask.Request.ID())) | ||
} | ||
if !isContextErr(err) && !isPausedErr(err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could skip isContextErr
check here by nesting in the above block, but also is this logic of these two blocks and order correct here? The previous form in run
doesn't check isPausedErr
and it seems strange to skip passing in the error if that's the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous form would not end on IsPausedError -- it handled the pause/resume internally and the request all the way to the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My lgtm isn't worth much here but I don't see any obvious problems.
I like the TaskQueue code though, 👌, fwiw.
ctx context.Context | ||
startTime time.Time | ||
cancelFn func() | ||
p peer.ID | ||
terminalError error | ||
pauseMessages chan struct{} | ||
state state | ||
lastResponse atomic.Value | ||
onTerminated []chan<- error | ||
request gsmsg.GraphSyncRequest | ||
doNotSendCids *cid.Set | ||
nodeStyleChooser traversal.LinkTargetNodePrototypeChooser | ||
inProgressChan chan graphsync.ResponseProgress | ||
inProgressErr chan error | ||
traverser ipldutil.Traverser |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a lot of state for a request status.
can none of these be de-duped? e.g. can you not learn the peer.ID from the request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope unfortuantely you can't.
Thhere are a lot of things to track, but also, I'm totally worried. the block memory trumps this in terms of memory consumption.
requestmanager/executor/executor.go
Outdated
) | ||
|
||
var log = logging.Logger("gs_request_executor") | ||
|
||
// Manager are the Executor uses to interact with the request manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grammar
re.restartNeeded = true | ||
return nil | ||
func (e *Executor) advanceTraversal(rt RequestTask, result types.AsyncLoadResult) error { | ||
if result.Err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could result.Err also be a traversal.SkipMe
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not.
} | ||
request = rt.Request.ReplaceExtensions([]graphsync.ExtensionData{{Name: graphsync.ExtensionDoNotSendCIDs, Data: cidsData}}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this extension need to be tightly interlinked in this logic, rather than abstracted to a general 'extension' interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes cause graphsync directly manages and updates it for pause/resume. BUT, it will still accept the version that is passed in with the call to graphsync.
// that pop tasks and execute them | ||
type WorkerTaskQueue struct { | ||
ctx context.Context | ||
cancelFn func() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
context.CancelFunc
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see external ticket but I will address shortly.
Co-authored-by: Will <will.scott@protocol.ai>
c7ed81e
to
5e9dd10
Compare
add precautionary check to avoid send on close channel
c892fe8
to
81adab5
Compare
81adab5
to
505ed8d
Compare
Goals
implement #215
Implementation
This is not a super simple PR as adding request limits required some sizable lifting to the request manager operations.
Here are the substantive changes:
For discussion
There are several obvious "next steps"/refactors that are NOT included in this PR to reduce the already complicated changeset.
There is one very small breaking change to graphsyncs little known and extremely underused pause request feature -- see #160 for the suggestion this shouldn't even exist. Previously, if you paused a request, it would pause on the next block load, but it would NOT return the IPLD responses for the last block. Now it does return IPLD responses. This is better behavior anyway, and I'm pretty sure no one is using this feature.
While not breaking, requests not executing immediately is a potential source of unexpected behavior and this PR, once merged to master, should be thouroughly vetted in go-data-transfer and lotus prior to merge into higher level products