-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove main thread block on allocation #216
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,7 +31,6 @@ type alternateQueue struct { | |
|
||
// Allocator indicates a mechanism for tracking memory used by a given peer | ||
type Allocator interface { | ||
AllocateBlockMemory(p peer.ID, amount uint64) <-chan error | ||
ReleaseBlockMemory(p peer.ID, amount uint64) error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So now an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just that go pattern of defining the interface at the site of usage. I guess we could rename it? Not sure. |
||
} | ||
|
||
|
@@ -113,16 +112,8 @@ func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOp | |
|
||
// ProcessResponse injests new responses and completes asynchronous loads as | ||
// neccesary | ||
func (al *AsyncLoader) ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata, | ||
func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, | ||
blks []blocks.Block) { | ||
totalMemoryAllocated := uint64(0) | ||
for _, blk := range blks { | ||
totalMemoryAllocated += uint64(len(blk.RawData())) | ||
} | ||
select { | ||
case <-al.allocator.AllocateBlockMemory(p, totalMemoryAllocated): | ||
case <-al.ctx.Done(): | ||
} | ||
select { | ||
case <-al.ctx.Done(): | ||
case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,7 +59,7 @@ type PeerHandler interface { | |
// results as new responses are processed | ||
type AsyncLoader interface { | ||
StartRequest(graphsync.RequestID, string) error | ||
ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata, | ||
ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, | ||
blks []blocks.Block) | ||
AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult | ||
CompleteResponsesFor(requestID graphsync.RequestID) | ||
|
@@ -278,17 +278,15 @@ type processResponseMessage struct { | |
p peer.ID | ||
responses []gsmsg.GraphSyncResponse | ||
blks []blocks.Block | ||
response chan error | ||
} | ||
|
||
// ProcessResponses ingests the given responses from the network and | ||
// and updates the in progress requests based on those responses. | ||
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse, | ||
blks []blocks.Block) { | ||
response := make(chan error, 1) | ||
err := rm.sendSyncMessage(&processResponseMessage{p, responses, blks, response}, response, nil) | ||
if err != nil { | ||
log.Warnf("ProcessResponses: %s", err) | ||
select { | ||
case rm.messages <- &processResponseMessage{p, responses, blks}: | ||
case <-rm.ctx.Done(): | ||
} | ||
} | ||
|
||
|
@@ -485,12 +483,8 @@ func (prm *processResponseMessage) handle(rm *RequestManager) { | |
filteredResponses = rm.filterResponsesForPeer(filteredResponses, prm.p) | ||
rm.updateLastResponses(filteredResponses) | ||
responseMetadata := metadataForResponses(filteredResponses) | ||
rm.asyncLoader.ProcessResponse(prm.p, responseMetadata, prm.blks) | ||
rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks) | ||
rm.processTerminations(filteredResponses) | ||
select { | ||
case <-rm.ctx.Done(): | ||
case prm.response <- nil: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why isn't sending There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. prm.response as a field was removed in this PR I think. Waiting on the response channel was the essentially what was used to block on allocation, but that's now just being done by allocating directly in the receive handler. |
||
} | ||
} | ||
|
||
func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If all the message receivers use the same allocator, would this still cause all other receivers to be blocked on memory allocation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The allocator takes a peer as a parameter cause it tracks memory allocations per peer as well as total. There's a total limit and a per peer limit. So if peer X tries to allocate above a certain amount, it will get held up seperately long before the total limit across peers gets hit.