Skip to content

Commit

Permalink
fix(requestmanager): make termination predictable
Browse files Browse the repository at this point in the history
termination from remote peers was not properly handling blocks included up to termination, and could
include unpredictable amounts in the response channel
  • Loading branch information
hannahhoward committed Oct 1, 2021
1 parent 9776ae5 commit a1036c8
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
1 change: 1 addition & 0 deletions requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type inProgressRequestStatus struct {
inProgressChan chan graphsync.ResponseProgress
inProgressErr chan error
traverser ipldutil.Traverser
traverserCancel context.CancelFunc
}

// PeerHandler is an interface that can send requests to peers
Expand Down
10 changes: 8 additions & 2 deletions requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,17 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
LinkBudget: int64(rm.maxLinksPerRequest),
}
}
// the traverser has its own context because we want to fail on block boundaries, in the executor,
// and make sure all blocks included up to the termination message
// are processed and passed in the response channel
ctx, cancel := context.WithCancel(rm.ctx)
ipr.traverserCancel = cancel
ipr.traverser = ipldutil.TraversalBuilder{
Root: cidlink.Link{Cid: ipr.request.Root()},
Selector: ipr.request.Selector(),
Visitor: func(tp traversal.Progress, node ipld.Node, tr traversal.VisitReason) error {
select {
case <-ipr.ctx.Done():
case <-ctx.Done():
case ipr.inProgressChan <- graphsync.ResponseProgress{
Node: node,
Path: tp.Path,
Expand All @@ -126,7 +131,7 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
Chooser: ipr.nodeStyleChooser,
LinkSystem: rm.linkSystem,
Budget: budget,
}.Start(ipr.ctx)
}.Start(ctx)
}

ipr.state = running
Expand Down Expand Up @@ -165,6 +170,7 @@ func (rm *RequestManager) terminateRequest(requestID graphsync.RequestID, ipr *i
ipr.cancelFn()
rm.asyncLoader.CleanupRequest(requestID)
if ipr.traverser != nil {
ipr.traverserCancel()
ipr.traverser.Shutdown(rm.ctx)
}
// make sure context is not closed before closing channels (could cause send
Expand Down
7 changes: 5 additions & 2 deletions testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,13 @@ func ReadNResponses(ctx context.Context, t TestingT, responseChan <-chan graphsy
var returnedBlocks []graphsync.ResponseProgress
for i := 0; i < count; i++ {
select {
case blk := <-responseChan:
case blk, ok := <-responseChan:
if !ok {
require.FailNowf(t, "Channel closed early", "expected %d messages, got %d", count, len(returnedBlocks))
}
returnedBlocks = append(returnedBlocks, blk)
case <-ctx.Done():
t.Fatal("Unable to read enough responses")
require.FailNow(t, "Unable to read enough responses")
}
}
return returnedBlocks
Expand Down

0 comments on commit a1036c8

Please sign in to comment.