Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for IPLD prime's budgets feature in selectors #235

Merged
merged 3 commits into from
Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/ipfs/go-peertaskqueue v0.2.0
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-codec-dagpb v1.3.0
github.com/ipld/go-ipld-prime v0.12.0
github.com/ipld/go-ipld-prime v0.12.3-0.20210929125341-05d5528bd84e
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we ask for a tag of this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do on monday

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before I commit go-graphsync v0.10.0 final

github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p v0.13.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZ
github.com/ipld/go-codec-dagpb v1.3.0 h1:czTcaoAuNNyIYWs6Qe01DJ+sEX7B+1Z0LcXjSatMGe8=
github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA=
github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8=
github.com/ipld/go-ipld-prime v0.12.0 h1:JapyKWTsJgmhrPI7hfx4V798c/RClr85sXfBZnH1VIw=
github.com/ipld/go-ipld-prime v0.12.0/go.mod h1:hy8b93WleDMRKumOJnTIrr0MbbFbx9GD6Kzxa53Xppc=
github.com/ipld/go-ipld-prime v0.12.3-0.20210929125341-05d5528bd84e h1:HPLQ9V/OFHKjfbFio8vQV+EW7lpQPj+iPl93VcwSTYM=
github.com/ipld/go-ipld-prime v0.12.3-0.20210929125341-05d5528bd84e/go.mod h1:PaeLYq8k6dJLmDUSLrzkEpoGV4PEfe/1OtFN/eALOc8=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand Down Expand Up @@ -539,6 +539,8 @@ github.com/multiformats/go-multiaddr-net v0.2.0/go.mod h1:gGdH3UXny6U3cKKYCvpXI5
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
github.com/multiformats/go-multicodec v0.3.0 h1:tstDwfIjiHbnIjeM5Lp+pMrSeN+LCMsEwOrkPmWm03A=
github.com/multiformats/go-multicodec v0.3.0/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
Expand Down
22 changes: 20 additions & 2 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type graphsyncConfigOptions struct {
maxInProgressIncomingRequests uint64
maxInProgressOutgoingRequests uint64
registerDefaultValidator bool
maxLinksPerOutgoingRequest uint64
maxLinksPerIncomingRequest uint64
}

// Option defines the functional option type that can be used to configure
Expand Down Expand Up @@ -136,6 +138,22 @@ func MaxInProgressOutgoingRequests(maxInProgressOutgoingRequests uint64) Option
}
}

// MaxLinksPerOutgoingRequests changes the allowed number of links an outgoing
// request can traverse before failing
func MaxLinksPerOutgoingRequests(maxLinksPerOutgoingRequest uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxLinksPerOutgoingRequest = maxLinksPerOutgoingRequest
}
}

// MaxLinksPerIncomingRequests changes the allowed number of links an incoming
// request can traverse before failing
func MaxLinksPerIncomingRequests(maxLinksPerIncomingRequest uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxLinksPerIncomingRequest = maxLinksPerIncomingRequest
}
}

// New creates a new GraphSync Exchange on the given network,
// and the given link loader+storer.
func New(parent context.Context, network gsnet.GraphSyncNetwork,
Expand Down Expand Up @@ -179,11 +197,11 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,

asyncLoader := asyncloader.New(ctx, linkSystem, requestAllocator)
requestQueue := taskqueue.NewTaskQueue(ctx)
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, requestQueue, network.ConnectionManager())
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, networkErrorListeners, requestQueue, network.ConnectionManager(), gsConfig.maxLinksPerOutgoingRequest)
requestExecutor := executor.NewExecutor(requestManager, incomingBlockHooks, asyncLoader.AsyncLoad)
responseAssembler := responseassembler.New(ctx, peerManager)
peerTaskQueue := peertaskqueue.New()
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressIncomingRequests, network.ConnectionManager())
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressIncomingRequests, network.ConnectionManager(), gsConfig.maxLinksPerIncomingRequest)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these have gotten pretty unwieldy. we probably want options or some other pattern for these constructors at some point

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they're initialized by the code itself, so I'd probably lean towards a struct, but I also want to move this whole package to an internal directory :) but yes for later

graphSync := &GraphSync{
network: network,
linkSystem: linkSystem,
Expand Down
52 changes: 52 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,58 @@ func TestRejectRequestsByDefault(t *testing.T) {
testutil.VerifySingleTerminalError(ctx, t, errChan)
}

func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

var linksToTraverse uint64 = 5
// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1(MaxLinksPerOutgoingRequests(linksToTraverse))

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
td.GraphSyncHost2()

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

// response budgets don't include the root block, so total links traverse with be one more than expected
blockChain.VerifyResponseRange(ctx, progressChan, 0, int(linksToTraverse))
testutil.VerifySingleTerminalError(ctx, t, errChan)
require.Len(t, td.blockStore1, int(linksToTraverse), "did not store all blocks")
}

func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

var linksToTraverse uint64 = 5
// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
td.GraphSyncHost2(MaxLinksPerIncomingRequests(linksToTraverse))

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

// response budgets don't include the root block, so total links traverse with be one more than expected
blockChain.VerifyResponseRange(ctx, progressChan, 0, int(linksToTraverse))
testutil.VerifySingleTerminalError(ctx, t, errChan)
require.Len(t, td.blockStore1, int(linksToTraverse), "did not store all blocks")
}

func TestGraphsyncRoundTrip(t *testing.T) {
// create network
ctx := context.Background()
Expand Down
10 changes: 10 additions & 0 deletions ipldutil/traverser.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type TraversalBuilder struct {
Visitor traversal.AdvVisitFn
LinkSystem ipld.LinkSystem
Chooser traversal.LinkTargetNodePrototypeChooser
Budget *traversal.Budget
}

// Traverser is an interface for performing a selector traversal that operates iteratively --
Expand Down Expand Up @@ -81,6 +82,7 @@ func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser {
visitor: defaultVisitor,
chooser: defaultChooser,
linkSystem: tb.LinkSystem,
budget: tb.Budget,
awaitRequest: make(chan struct{}, 1),
stateChan: make(chan state, 1),
responses: make(chan nextResponse),
Expand Down Expand Up @@ -120,6 +122,7 @@ type traverser struct {
chooser traversal.LinkTargetNodePrototypeChooser
currentLink ipld.Link
currentContext ipld.LinkContext
budget *traversal.Budget
isDone bool
completionErr error
awaitRequest chan struct{}
Expand Down Expand Up @@ -184,6 +187,12 @@ func (t *traverser) start() {
t.writeDone(err)
return
}
if t.budget != nil {
t.budget.LinkBudget--
if t.budget.LinkBudget <= 0 {
t.writeDone(&traversal.ErrBudgetExceeded{BudgetKind: "link", Link: t.root})
}
}
nd, err := t.linkSystem.Load(ipld.LinkContext{Ctx: t.ctx}, t.root, ns)
if err != nil {
t.writeDone(err)
Expand All @@ -201,6 +210,7 @@ func (t *traverser) start() {
LinkSystem: t.linkSystem,
LinkTargetNodePrototypeChooser: t.chooser,
},
Budget: t.budget,
}.WalkAdv(nd, sel, t.visitor)
t.writeDone(err)
}()
Expand Down
58 changes: 54 additions & 4 deletions ipldutil/traverser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ipldutil
import (
"bytes"
"context"
"math"
"testing"
"time"

Expand Down Expand Up @@ -56,7 +57,7 @@ func TestTraverser(t *testing.T) {
testdata.MiddleMapBlock,
testdata.LeafAlphaBlock,
testdata.LeafAlphaBlock,
})
}, nil)
})

t.Run("traverses correctly, blockchain", func(t *testing.T) {
Expand Down Expand Up @@ -86,13 +87,58 @@ func TestTraverser(t *testing.T) {
blockChain.VerifyWholeChainWithTypes(ctx, inProgressChan)
close(done)
}()
checkTraverseSequence(ctx, t, traverser, blockChain.AllBlocks())
checkTraverseSequence(ctx, t, traverser, blockChain.AllBlocks(), nil)
close(inProgressChan)
testutil.AssertDoesReceive(ctx, t, done, "should have completed verification but did not")
})

t.Run("errors correctly, with budget", func(t *testing.T) {
store := make(map[ipld.Link][]byte)
persistence := testutil.NewTestStore(store)
blockChain := testutil.SetupBlockChain(ctx, t, persistence, 100, 10)
traverser := TraversalBuilder{
Root: blockChain.TipLink,
Selector: blockChain.Selector(),
Chooser: blockChain.Chooser,
LinkSystem: persistence,
Visitor: func(tp traversal.Progress, node ipld.Node, r traversal.VisitReason) error {
return nil
},
Budget: &traversal.Budget{
NodeBudget: math.MaxInt64,
LinkBudget: 6,
},
}.Start(ctx)
var path ipld.Path
for i := 0; i < 6; i++ {
path = path.AppendSegment(ipld.PathSegmentOfString("Parents"))
path = path.AppendSegment(ipld.PathSegmentOfInt(0))
}
checkTraverseSequence(ctx, t, traverser, blockChain.Blocks(0, 6), &traversal.ErrBudgetExceeded{BudgetKind: "link", Path: path, Link: blockChain.LinkTipIndex(6)})
})

t.Run("errors correctly, no budget", func(t *testing.T) {
store := make(map[ipld.Link][]byte)
persistence := testutil.NewTestStore(store)
blockChain := testutil.SetupBlockChain(ctx, t, persistence, 100, 10)
traverser := TraversalBuilder{
Root: blockChain.TipLink,
Selector: blockChain.Selector(),
Chooser: blockChain.Chooser,
LinkSystem: persistence,
Visitor: func(tp traversal.Progress, node ipld.Node, r traversal.VisitReason) error {
return nil
},
Budget: &traversal.Budget{
NodeBudget: math.MaxInt64,
LinkBudget: 0,
},
}.Start(ctx)
checkTraverseSequence(ctx, t, traverser, []blocks.Block{}, &traversal.ErrBudgetExceeded{BudgetKind: "link", Link: blockChain.TipLink})
})
}

func checkTraverseSequence(ctx context.Context, t *testing.T, traverser Traverser, expectedBlks []blocks.Block) {
func checkTraverseSequence(ctx context.Context, t *testing.T, traverser Traverser, expectedBlks []blocks.Block, finalErr error) {
for _, blk := range expectedBlks {
isComplete, err := traverser.IsComplete()
require.False(t, isComplete)
Expand All @@ -104,5 +150,9 @@ func checkTraverseSequence(ctx context.Context, t *testing.T, traverser Traverse
}
isComplete, err := traverser.IsComplete()
require.True(t, isComplete)
require.NoError(t, err)
if finalErr == nil {
require.NoError(t, err)
} else {
require.EqualError(t, err, finalErr.Error())
}
}
22 changes: 13 additions & 9 deletions requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type inProgressRequestStatus struct {
inProgressChan chan graphsync.ResponseProgress
inProgressErr chan error
traverser ipldutil.Traverser
traverserCancel context.CancelFunc
}

// PeerHandler is an interface that can send requests to peers
Expand All @@ -87,15 +88,16 @@ type AsyncLoader interface {
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
ctx context.Context
cancel func()
messages chan requestManagerMessage
peerHandler PeerHandler
rc *responseCollector
asyncLoader AsyncLoader
disconnectNotif *pubsub.PubSub
linkSystem ipld.LinkSystem
connManager network.ConnManager
ctx context.Context
cancel func()
messages chan requestManagerMessage
peerHandler PeerHandler
rc *responseCollector
asyncLoader AsyncLoader
disconnectNotif *pubsub.PubSub
linkSystem ipld.LinkSystem
connManager network.ConnManager
maxLinksPerRequest uint64

// dont touch out side of run loop
nextRequestID graphsync.RequestID
Expand Down Expand Up @@ -129,6 +131,7 @@ func New(ctx context.Context,
networkErrorListeners *listeners.NetworkErrorListeners,
requestQueue taskqueue.TaskQueue,
connManager network.ConnManager,
maxLinksPerRequest uint64,
) *RequestManager {
ctx, cancel := context.WithCancel(ctx)
return &RequestManager{
Expand All @@ -145,6 +148,7 @@ func New(ctx context.Context,
networkErrorListeners: networkErrorListeners,
requestQueue: requestQueue,
connManager: connManager,
maxLinksPerRequest: maxLinksPerRequest,
}
}

Expand Down
2 changes: 1 addition & 1 deletion requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ func newTestData(ctx context.Context, t *testing.T) *testData {
td.networkErrorListeners = listeners.NewNetworkErrorListeners()
td.taskqueue = taskqueue.NewTaskQueue(ctx)
lsys := cidlink.DefaultLinkSystem()
td.requestManager = New(ctx, td.fal, lsys, td.requestHooks, td.responseHooks, td.networkErrorListeners, td.taskqueue, td.tcm)
td.requestManager = New(ctx, td.fal, lsys, td.requestHooks, td.responseHooks, td.networkErrorListeners, td.taskqueue, td.tcm, 0)
td.executor = executor.NewExecutor(td.requestManager, td.blockHooks, td.fal.AsyncLoad)
td.requestManager.SetDelegate(td.fph)
td.requestManager.Startup()
Expand Down
18 changes: 16 additions & 2 deletions requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,24 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
var initialRequest bool
if ipr.traverser == nil {
initialRequest = true
var budget *traversal.Budget
if rm.maxLinksPerRequest > 0 {
budget = &traversal.Budget{
NodeBudget: math.MaxInt64,
LinkBudget: int64(rm.maxLinksPerRequest),
}
}
// the traverser has its own context because we want to fail on block boundaries, in the executor,
// and make sure all blocks included up to the termination message
// are processed and passed in the response channel
ctx, cancel := context.WithCancel(rm.ctx)
ipr.traverserCancel = cancel
ipr.traverser = ipldutil.TraversalBuilder{
Root: cidlink.Link{Cid: ipr.request.Root()},
Selector: ipr.request.Selector(),
Visitor: func(tp traversal.Progress, node ipld.Node, tr traversal.VisitReason) error {
select {
case <-ipr.ctx.Done():
case <-ctx.Done():
case ipr.inProgressChan <- graphsync.ResponseProgress{
Node: node,
Path: tp.Path,
Expand All @@ -118,7 +130,8 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re
},
Chooser: ipr.nodeStyleChooser,
LinkSystem: rm.linkSystem,
}.Start(ipr.ctx)
Budget: budget,
}.Start(ctx)
}

ipr.state = running
Expand Down Expand Up @@ -157,6 +170,7 @@ func (rm *RequestManager) terminateRequest(requestID graphsync.RequestID, ipr *i
ipr.cancelFn()
rm.asyncLoader.CleanupRequest(requestID)
if ipr.traverser != nil {
ipr.traverserCancel()
ipr.traverser.Shutdown(rm.ctx)
}
// make sure context is not closed before closing channels (could cause send
Expand Down
3 changes: 3 additions & 0 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type ResponseManager struct {
inProgressResponses map[responseKey]*inProgressResponseStatus
maxInProcessRequests uint64
connManager network.ConnManager
maxLinksPerRequest uint64
}

// New creates a new response manager for responding to requests
Expand All @@ -172,6 +173,7 @@ func New(ctx context.Context,
networkErrorListeners NetworkErrorListeners,
maxInProcessRequests uint64,
connManager network.ConnManager,
maxLinksPerRequest uint64,
) *ResponseManager {
ctx, cancelFn := context.WithCancel(ctx)
messages := make(chan responseManagerMessage, 16)
Expand All @@ -194,6 +196,7 @@ func New(ctx context.Context,
inProgressResponses: make(map[responseKey]*inProgressResponseStatus),
maxInProcessRequests: maxInProcessRequests,
connManager: connManager,
maxLinksPerRequest: maxLinksPerRequest,
}
rm.qe = &queryExecutor{
blockHooks: blockHooks,
Expand Down
Loading