Skip to content

Commit

Permalink
closer
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Jul 3, 2024
1 parent a06156c commit 9c35f18
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 116 deletions.
197 changes: 113 additions & 84 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
package decision

import (
"cmp"
"context"
"errors"
"fmt"
"slices"
"sync"
"time"

"github.com/google/uuid"

wl "github.com/ipfs/boxo/bitswap/client/wantlist"
"github.com/ipfs/boxo/bitswap/internal/defaults"
bsmsg "github.com/ipfs/boxo/bitswap/message"
Expand Down Expand Up @@ -131,7 +133,7 @@ type PeerEntry struct {
// PeerLedger is an external ledger dealing with peers and their want lists.
type PeerLedger interface {
// Wants informs the ledger that [peer.ID] wants [wl.Entry].
Wants(p peer.ID, e wl.Entry)
Wants(p peer.ID, e wl.Entry, limit int) bool

// CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID].
CancelWant(p peer.ID, k cid.Cid) bool
Expand Down Expand Up @@ -675,14 +677,12 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
return false
}

newWorkExists := false
defer func() {
if newWorkExists {
e.signalNewWork()
}
}()

wants, cancels, denials := e.splitWantsCancelsDenials(p, m)
wants, cancels, denials, err := e.splitWantsCancelsDenials(p, m)
if err != nil {
// This is a truely broken client, let's kill the connection.
log.Warnw(err.Error(), "local", e.self, "remote", p)
return true
}

// Get block sizes
wantKs := cid.NewSet()
Expand All @@ -701,90 +701,59 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
e.peerLedger.ClearPeerWantlist(p)
}

var overflow []bsmsg.Entry
if len(wants) != 0 {
filteredWants := wants[:0] // shift inplace
for _, entry := range wants {
if entry.Cid.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
e.lock.Unlock()
log.Warnw("peer wants an identity CID", "local", e.self, "remote", p)
return true
}
if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
if !e.peerLedger.Wants(p, entry.Entry, int(e.maxQueuedWantlistEntriesPerPeer)) {
// Cannot add entry because it would exceed size limit.
overflow = append(overflow, entry)
continue
}
filteredWants = append(filteredWants, entry)
if len(filteredWants) == int(e.maxQueuedWantlistEntriesPerPeer) {
// filteredWants at limit, ignore remaining wants from request.
log.Debugw("requested wants exceeds max wantlist size", "local", e.self, "remote", p, "ignoring", len(wants)-len(filteredWants))
break
}
}
wants = wants[len(filteredWants):]
for i := range wants {
wants[i] = bsmsg.Entry{} // early GC
}
// Clear truncated entries - early GC.
clear(wants[len(filteredWants):])
wants = filteredWants
}

// Ensure sufficient space for new wants.
s := e.peerLedger.WantlistSizeForPeer(p)
available := int(e.maxQueuedWantlistEntriesPerPeer) - s
if len(wants) > available {
needSpace := len(wants) - available
log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", s+len(wants), "canceling", needSpace)
// Cancel any wants that are being requested again. This makes room
// for new wants and minimizes that existing wants to cancel that
// are not in the new request.
for _, entry := range wants {
if e.peerLedger.CancelWant(p, entry.Cid) {
e.peerRequestQueue.Remove(entry.Cid, p)
needSpace--
if needSpace == 0 {
break
}
}
if len(overflow) != 0 {
// Sort wl and overflow from least to most important.
peerWants := e.peerLedger.WantlistForPeer(p)
slices.SortFunc(peerWants, func(a, b wl.Entry) int {
return cmp.Compare(a.Priority, b.Priority)
})
slices.SortFunc(overflow, func(a, b bsmsg.Entry) int {
return cmp.Compare(a.Entry.Priority, b.Entry.Priority)
})

// Put overflow wants onto the request queue by replacing entries that
// have the same or lower priority.
var replace int
for _, entry := range overflow {
if entry.Entry.Priority <= peerWants[replace].Priority {
// Everything in peerWants is equal or more improtant, so this
// overflow entry cannot replace any existing wants.
continue
}
// Cancel additional wants, that are not being replaced, to make
// room for new wants.
if needSpace != 0 {
wl := e.peerLedger.WantlistForPeer(p)
for i := range wl {
entCid := wl[i].Cid
if e.peerLedger.CancelWant(p, entCid) {
e.peerRequestQueue.Remove(entCid, p)
needSpace--
if needSpace == 0 {
break
}
}
}
entCid := peerWants[replace].Cid
replace++
if e.peerLedger.CancelWant(p, entCid) {
e.peerRequestQueue.Remove(entCid, p)
}
}

for _, entry := range wants {
e.peerLedger.Wants(p, entry.Entry)
e.peerLedger.Wants(p, entry.Entry, int(e.maxQueuedWantlistEntriesPerPeer))
wants = append(wants, entry)

Check warning on line 745 in bitswap/server/internal/decision/engine.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/engine.go#L739-L745

Added lines #L739 - L745 were not covered by tests
}
}

for _, entry := range cancels {
c := entry.Cid
if c.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
e.lock.Unlock()
log.Warnw("peer canceled an identity CID", "local", e.self, "remote", p)
return true
}
if e.maxCidSize != 0 && uint(c.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
}

log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", c)
if e.peerLedger.CancelWant(p, c) {
e.peerRequestQueue.Remove(c, p)
}
}

e.lock.Unlock()

var activeEntries []peertask.Task
Expand All @@ -795,7 +764,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
if e.sendDontHaves && entry.SendDontHave {
c := entry.Cid

newWorkExists = true
isWantBlock := false
if entry.WantType == pb.Message_Wantlist_Block {
isWantBlock = true
Expand Down Expand Up @@ -833,8 +801,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
continue
}
// The block was found, add it to the queue
newWorkExists = true

isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock)
Expand All @@ -860,19 +826,64 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
})
}

// Push entries onto the request queue
if len(activeEntries) > 0 {
// Push entries onto the request queue and signal network that new work is ready.
if len(activeEntries) != 0 {
e.peerRequestQueue.PushTasksTruncated(e.maxQueuedWantlistEntriesPerPeer, p, activeEntries...)
e.updateMetrics()
e.signalNewWork()
}
return false
}

/*
// Ensure sufficient space for new wants.
s := e.peerLedger.WantlistSizeForPeer(p)
available := int(e.maxQueuedWantlistEntriesPerPeer) - s
if len(wants) > available {
needSpace := len(wants) - available
log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", s+len(wants), "canceling", needSpace)
// Cancel any wants that are being requested again. This makes room
// for new wants and minimizes that existing wants to cancel that
// are not in the new request.
for _, entry := range wants {
if e.peerLedger.CancelWant(p, entry.Cid) {
e.peerRequestQueue.Remove(entry.Cid, p)
needSpace--
if needSpace == 0 {
break
}
}
}
// Cancel additional wants, that are not being replaced, to make
// room for new wants.
if needSpace != 0 {
wl := e.peerLedger.WantlistForPeer(p)
for i := range wl {
entCid := wl[i].Cid
if e.peerLedger.CancelWant(p, entCid) {
e.peerRequestQueue.Remove(entCid, p)
needSpace--
if needSpace == 0 {
break
}
}
}
}
}
for _, entry := range wants {
e.peerLedger.Wants(p, entry.Entry)
}
}
*/

// Split the want-havek entries from the cancel and deny entries.
func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]bsmsg.Entry, []bsmsg.Entry, []bsmsg.Entry) {
func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]bsmsg.Entry, []bsmsg.Entry, []bsmsg.Entry, error) {
entries := m.Wantlist() // creates copy; safe to modify
if len(entries) == 0 {
return nil, nil, nil
return nil, nil, nil, nil
}

log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries))
Expand All @@ -881,18 +892,27 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]
var cancels, denials []bsmsg.Entry

for _, et := range entries {
c := et.Cid
if e.maxCidSize != 0 && uint(c.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
}
if c.Prefix().MhType == mh.IDENTITY {
return nil, nil, nil, errors.New("peer canceled an identity CID")
}

if et.Cancel {
cancels = append(cancels, et)
continue
}

if et.WantType == pb.Message_Wantlist_Have {
log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", et.Cid)
log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", c)
} else {
log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", et.Cid)
log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", c)
}

if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, et.Cid) {
if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) {
denials = append(denials, et)
continue
}
Expand All @@ -904,10 +924,19 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]
wants = nil
}

// Do not take more wants that can be handled.
if len(wants) > int(e.maxQueuedWantlistEntriesPerPeer) {
// Keep the highest priority wants.
slices.SortFunc(wants, func(a, b bsmsg.Entry) int {
return cmp.Compare(b.Entry.Priority, a.Entry.Priority)
})
wants = wants[:int(e.maxQueuedWantlistEntriesPerPeer)]

Check warning on line 933 in bitswap/server/internal/decision/engine.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/engine.go#L929-L933

Added lines #L929 - L933 were not covered by tests
}

// Clear truncated entries.
clear(entries[len(wants):])

return wants, cancels, denials
return wants, cancels, denials, nil
}

// ReceivedBlocks is called when new blocks are received from the network.
Expand Down
Loading

0 comments on commit 9c35f18

Please sign in to comment.