Skip to content

Commit

Permalink
Merge pull request ipld/go-car#251 from ipld/rvagg/selective-traversa…
Browse files Browse the repository at this point in the history
…l-options

Expose selector traversal options for SelectiveCar

This commit was moved from ipld/go-car@c93f536
  • Loading branch information
hannahhoward authored Oct 1, 2021
2 parents 501b841 + c27f485 commit b373fc4
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 110 deletions.
113 changes: 7 additions & 106 deletions ipld/car/car_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package car
package car_test

import (
"bytes"
Expand All @@ -12,10 +12,7 @@ import (
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
dstest "github.com/ipfs/go-merkledag/test"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"
car "github.com/ipld/go-car"
)

func assertAddNodes(t *testing.T, ds format.DAGService, nds ...format.Node) {
Expand Down Expand Up @@ -46,12 +43,12 @@ func TestRoundtrip(t *testing.T) {
assertAddNodes(t, dserv, a, b, c, nd1, nd2, nd3)

buf := new(bytes.Buffer)
if err := WriteCar(context.Background(), dserv, []cid.Cid{nd3.Cid()}, buf); err != nil {
if err := car.WriteCar(context.Background(), dserv, []cid.Cid{nd3.Cid()}, buf); err != nil {
t.Fatal(err)
}

bserv := dstest.Bserv()
ch, err := LoadCar(bserv.Blockstore(), buf)
ch, err := car.LoadCar(bserv.Blockstore(), buf)
if err != nil {
t.Fatal(err)
}
Expand All @@ -77,111 +74,15 @@ func TestRoundtrip(t *testing.T) {
}
}

func TestRoundtripSelective(t *testing.T) {
sourceBserv := dstest.Bserv()
sourceBs := sourceBserv.Blockstore()
dserv := merkledag.NewDAGService(sourceBserv)
a := merkledag.NewRawNode([]byte("aaaa"))
b := merkledag.NewRawNode([]byte("bbbb"))
c := merkledag.NewRawNode([]byte("cccc"))

nd1 := &merkledag.ProtoNode{}
nd1.AddNodeLink("cat", a)

nd2 := &merkledag.ProtoNode{}
nd2.AddNodeLink("first", nd1)
nd2.AddNodeLink("dog", b)
nd2.AddNodeLink("repeat", nd1)

nd3 := &merkledag.ProtoNode{}
nd3.AddNodeLink("second", nd2)
nd3.AddNodeLink("bear", c)

assertAddNodes(t, dserv, a, b, c, nd1, nd2, nd3)

ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)

// the graph assembled above looks as follows, in order:
// nd3 -> [c, nd2 -> [nd1 -> a, b, nd1 -> a]]
// this selector starts at n3, and traverses a link at index 1 (nd2, the second link, zero indexed)
// it then recursively traverses all of its children
// the only node skipped is 'c' -- link at index 0 immediately below nd3
// the purpose is simply to show we are not writing the entire merkledag underneath
// nd3
selector := ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) {
efsb.Insert("Links",
ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))))
}).Node()

sc := NewSelectiveCar(context.Background(), sourceBs, []Dag{{Root: nd3.Cid(), Selector: selector}})

// write car in one step
buf := new(bytes.Buffer)
blockCount := 0
var oneStepBlocks []Block
err := sc.Write(buf, func(block Block) error {
oneStepBlocks = append(oneStepBlocks, block)
blockCount++
return nil
})
require.Equal(t, blockCount, 5)
require.NoError(t, err)

// create a new builder for two-step write
sc2 := NewSelectiveCar(context.Background(), sourceBs, []Dag{{Root: nd3.Cid(), Selector: selector}})

// write car in two steps
var twoStepBlocks []Block
scp, err := sc2.Prepare(func(block Block) error {
twoStepBlocks = append(twoStepBlocks, block)
return nil
})
require.NoError(t, err)
buf2 := new(bytes.Buffer)
err = scp.Dump(buf2)
require.NoError(t, err)

// verify preparation step correctly assesed length and blocks
require.Equal(t, scp.Size(), uint64(buf.Len()))
require.Equal(t, len(scp.Cids()), blockCount)

// verify equal data written by both methods
require.Equal(t, buf.Bytes(), buf2.Bytes())

// verify equal blocks were passed to user block hook funcs
require.Equal(t, oneStepBlocks, twoStepBlocks)

// readout car and verify contents
bserv := dstest.Bserv()
ch, err := LoadCar(bserv.Blockstore(), buf)
require.NoError(t, err)
require.Equal(t, len(ch.Roots), 1)

require.True(t, ch.Roots[0].Equals(nd3.Cid()))

bs := bserv.Blockstore()
for _, nd := range []format.Node{a, b, nd1, nd2, nd3} {
has, err := bs.Has(nd.Cid())
require.NoError(t, err)
require.True(t, has)
}

for _, nd := range []format.Node{c} {
has, err := bs.Has(nd.Cid())
require.NoError(t, err)
require.False(t, has)
}
}

func TestEOFHandling(t *testing.T) {
// fixture is a clean single-block, single-root CAR
fixture, err := hex.DecodeString("3aa265726f6f747381d82a58250001711220151fe9e73c6267a7060c6f6c4cca943c236f4b196723489608edb42a8b8fa80b6776657273696f6e012c01711220151fe9e73c6267a7060c6f6c4cca943c236f4b196723489608edb42a8b8fa80ba165646f646779f5")
if err != nil {
t.Fatal(err)
}

load := func(t *testing.T, byts []byte) *CarReader {
cr, err := NewCarReader(bytes.NewReader(byts))
load := func(t *testing.T, byts []byte) *car.CarReader {
cr, err := car.NewCarReader(bytes.NewReader(byts))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -294,7 +195,7 @@ func TestBadHeaders(t *testing.T) {
if err != nil {
t.Fatal(err)
}
_, err = NewCarReader(bytes.NewReader(fixture))
_, err = car.NewCarReader(bytes.NewReader(fixture))
return err
}

Expand Down
51 changes: 51 additions & 0 deletions ipld/car/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package car

import "math"

// options holds the configured options after applying a number of
// Option funcs.
type options struct {
TraverseLinksOnlyOnce bool
MaxTraversalLinks uint64
}

// Option describes an option which affects behavior when
// interacting with the interface.
type Option func(*options)

// TraverseLinksOnlyOnce prevents the traversal engine from repeatedly visiting
// the same links more than once.
//
// This can be an efficient strategy for an exhaustive selector where it's known
// that repeat visits won't impact the completeness of execution. However it
// should be used with caution with most other selectors as repeat visits of
// links for different reasons during selector execution can be valid and
// necessary to perform full traversal.
func TraverseLinksOnlyOnce() Option {
return func(sco *options) {
sco.TraverseLinksOnlyOnce = true
}
}

// MaxTraversalLinks changes the allowed number of links a selector traversal
// can execute before failing.
//
// Note that setting this option may cause an error to be returned from selector
// execution when building a SelectiveCar.
func MaxTraversalLinks(MaxTraversalLinks uint64) Option {
return func(sco *options) {
sco.MaxTraversalLinks = MaxTraversalLinks
}
}

// applyOptions applies given opts and returns the resulting options.
func applyOptions(opt ...Option) options {
opts := options{
TraverseLinksOnlyOnce: false, // default: recurse until exhausted
MaxTraversalLinks: math.MaxInt64, // default: traverse all
}
for _, o := range opt {
o(&opts)
}
return opts
}
27 changes: 27 additions & 0 deletions ipld/car/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package car

import (
"math"
"testing"

"github.com/stretchr/testify/require"
)

func TestApplyOptions_SetsExpectedDefaults(t *testing.T) {
require.Equal(t, options{
MaxTraversalLinks: math.MaxInt64,
TraverseLinksOnlyOnce: false,
}, applyOptions())
}

func TestApplyOptions_AppliesOptions(t *testing.T) {
require.Equal(t,
options{
MaxTraversalLinks: 123,
TraverseLinksOnlyOnce: true,
},
applyOptions(
MaxTraversalLinks(123),
TraverseLinksOnlyOnce(),
))
}
18 changes: 14 additions & 4 deletions ipld/car/selectivecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"math"

cid "github.com/ipfs/go-cid"
util "github.com/ipld/go-car/util"
Expand Down Expand Up @@ -40,6 +41,7 @@ type SelectiveCar struct {
ctx context.Context
dags []Dag
store ReadStore
opts options
}

// OnCarHeaderFunc is called during traversal when the header is created
Expand All @@ -61,16 +63,16 @@ type SelectiveCarPrepared struct {

// NewSelectiveCar creates a new SelectiveCar for the given car file based
// a block store and set of root+selector pairs
func NewSelectiveCar(ctx context.Context, store ReadStore, dags []Dag) SelectiveCar {
func NewSelectiveCar(ctx context.Context, store ReadStore, dags []Dag, opts ...Option) SelectiveCar {
return SelectiveCar{
ctx: ctx,
store: store,
dags: dags,
opts: applyOptions(opts...),
}
}

func (sc SelectiveCar) traverse(onCarHeader OnCarHeaderFunc, onNewCarBlock OnNewCarBlockFunc) (uint64, error) {

traverser := &selectiveCarTraverser{onCarHeader, onNewCarBlock, 0, cid.NewSet(), sc, cidlink.DefaultLinkSystem()}
traverser.lsys.StorageReadOpener = traverser.loader
return traverser.traverse()
Expand Down Expand Up @@ -264,13 +266,21 @@ func (sct *selectiveCarTraverser) traverseBlocks() error {
if err != nil {
return err
}
err = traversal.Progress{
prog := traversal.Progress{
Cfg: &traversal.Config{
Ctx: sct.sc.ctx,
LinkSystem: sct.lsys,
LinkTargetNodePrototypeChooser: nsc,
LinkVisitOnlyOnce: sct.sc.opts.TraverseLinksOnlyOnce,
},
}.WalkAdv(nd, parsed, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil })
}
if sct.sc.opts.MaxTraversalLinks < math.MaxInt64 {
prog.Budget = &traversal.Budget{
NodeBudget: math.MaxInt64,
LinkBudget: int64(sct.sc.opts.MaxTraversalLinks),
}
}
err = prog.WalkAdv(nd, parsed, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil })
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit b373fc4

Please sign in to comment.