diff --git a/ipld/car/car_test.go b/ipld/car/car_test.go index 137edd629..0a9c80fed 100644 --- a/ipld/car/car_test.go +++ b/ipld/car/car_test.go @@ -1,4 +1,4 @@ -package car +package car_test import ( "bytes" @@ -12,10 +12,7 @@ import ( format "github.com/ipfs/go-ipld-format" "github.com/ipfs/go-merkledag" dstest "github.com/ipfs/go-merkledag/test" - basicnode "github.com/ipld/go-ipld-prime/node/basic" - "github.com/ipld/go-ipld-prime/traversal/selector" - "github.com/ipld/go-ipld-prime/traversal/selector/builder" - "github.com/stretchr/testify/require" + car "github.com/ipld/go-car" ) func assertAddNodes(t *testing.T, ds format.DAGService, nds ...format.Node) { @@ -46,12 +43,12 @@ func TestRoundtrip(t *testing.T) { assertAddNodes(t, dserv, a, b, c, nd1, nd2, nd3) buf := new(bytes.Buffer) - if err := WriteCar(context.Background(), dserv, []cid.Cid{nd3.Cid()}, buf); err != nil { + if err := car.WriteCar(context.Background(), dserv, []cid.Cid{nd3.Cid()}, buf); err != nil { t.Fatal(err) } bserv := dstest.Bserv() - ch, err := LoadCar(bserv.Blockstore(), buf) + ch, err := car.LoadCar(bserv.Blockstore(), buf) if err != nil { t.Fatal(err) } @@ -77,102 +74,6 @@ func TestRoundtrip(t *testing.T) { } } -func TestRoundtripSelective(t *testing.T) { - sourceBserv := dstest.Bserv() - sourceBs := sourceBserv.Blockstore() - dserv := merkledag.NewDAGService(sourceBserv) - a := merkledag.NewRawNode([]byte("aaaa")) - b := merkledag.NewRawNode([]byte("bbbb")) - c := merkledag.NewRawNode([]byte("cccc")) - - nd1 := &merkledag.ProtoNode{} - nd1.AddNodeLink("cat", a) - - nd2 := &merkledag.ProtoNode{} - nd2.AddNodeLink("first", nd1) - nd2.AddNodeLink("dog", b) - nd2.AddNodeLink("repeat", nd1) - - nd3 := &merkledag.ProtoNode{} - nd3.AddNodeLink("second", nd2) - nd3.AddNodeLink("bear", c) - - assertAddNodes(t, dserv, a, b, c, nd1, nd2, nd3) - - ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) - - // the graph assembled above looks as follows, in order: - // nd3 -> [c, nd2 -> [nd1 -> a, b, nd1 -> a]] - // this selector starts at n3, and traverses a link at index 1 (nd2, the second link, zero indexed) - // it then recursively traverses all of its children - // the only node skipped is 'c' -- link at index 0 immediately below nd3 - // the purpose is simply to show we are not writing the entire merkledag underneath - // nd3 - selector := ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) { - efsb.Insert("Links", - ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())))) - }).Node() - - sc := NewSelectiveCar(context.Background(), sourceBs, []Dag{{Root: nd3.Cid(), Selector: selector}}) - - // write car in one step - buf := new(bytes.Buffer) - blockCount := 0 - var oneStepBlocks []Block - err := sc.Write(buf, func(block Block) error { - oneStepBlocks = append(oneStepBlocks, block) - blockCount++ - return nil - }) - require.Equal(t, blockCount, 5) - require.NoError(t, err) - - // create a new builder for two-step write - sc2 := NewSelectiveCar(context.Background(), sourceBs, []Dag{{Root: nd3.Cid(), Selector: selector}}) - - // write car in two steps - var twoStepBlocks []Block - scp, err := sc2.Prepare(func(block Block) error { - twoStepBlocks = append(twoStepBlocks, block) - return nil - }) - require.NoError(t, err) - buf2 := new(bytes.Buffer) - err = scp.Dump(buf2) - require.NoError(t, err) - - // verify preparation step correctly assesed length and blocks - require.Equal(t, scp.Size(), uint64(buf.Len())) - require.Equal(t, len(scp.Cids()), blockCount) - - // verify equal data written by both methods - require.Equal(t, buf.Bytes(), buf2.Bytes()) - - // verify equal blocks were passed to user block hook funcs - require.Equal(t, oneStepBlocks, twoStepBlocks) - - // readout car and verify contents - bserv := dstest.Bserv() - ch, err := LoadCar(bserv.Blockstore(), buf) - require.NoError(t, err) - require.Equal(t, len(ch.Roots), 1) - - require.True(t, ch.Roots[0].Equals(nd3.Cid())) - - bs := bserv.Blockstore() - for _, nd := range []format.Node{a, b, nd1, nd2, nd3} { - has, err := bs.Has(nd.Cid()) - require.NoError(t, err) - require.True(t, has) - } - - for _, nd := range []format.Node{c} { - has, err := bs.Has(nd.Cid()) - require.NoError(t, err) - require.False(t, has) - } -} - func TestEOFHandling(t *testing.T) { // fixture is a clean single-block, single-root CAR fixture, err := hex.DecodeString("3aa265726f6f747381d82a58250001711220151fe9e73c6267a7060c6f6c4cca943c236f4b196723489608edb42a8b8fa80b6776657273696f6e012c01711220151fe9e73c6267a7060c6f6c4cca943c236f4b196723489608edb42a8b8fa80ba165646f646779f5") @@ -180,8 +81,8 @@ func TestEOFHandling(t *testing.T) { t.Fatal(err) } - load := func(t *testing.T, byts []byte) *CarReader { - cr, err := NewCarReader(bytes.NewReader(byts)) + load := func(t *testing.T, byts []byte) *car.CarReader { + cr, err := car.NewCarReader(bytes.NewReader(byts)) if err != nil { t.Fatal(err) } @@ -294,7 +195,7 @@ func TestBadHeaders(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = NewCarReader(bytes.NewReader(fixture)) + _, err = car.NewCarReader(bytes.NewReader(fixture)) return err } diff --git a/ipld/car/options.go b/ipld/car/options.go new file mode 100644 index 000000000..e317f9cc9 --- /dev/null +++ b/ipld/car/options.go @@ -0,0 +1,51 @@ +package car + +import "math" + +// options holds the configured options after applying a number of +// Option funcs. +type options struct { + TraverseLinksOnlyOnce bool + MaxTraversalLinks uint64 +} + +// Option describes an option which affects behavior when +// interacting with the interface. +type Option func(*options) + +// TraverseLinksOnlyOnce prevents the traversal engine from repeatedly visiting +// the same links more than once. +// +// This can be an efficient strategy for an exhaustive selector where it's known +// that repeat visits won't impact the completeness of execution. However it +// should be used with caution with most other selectors as repeat visits of +// links for different reasons during selector execution can be valid and +// necessary to perform full traversal. +func TraverseLinksOnlyOnce() Option { + return func(sco *options) { + sco.TraverseLinksOnlyOnce = true + } +} + +// MaxTraversalLinks changes the allowed number of links a selector traversal +// can execute before failing. +// +// Note that setting this option may cause an error to be returned from selector +// execution when building a SelectiveCar. +func MaxTraversalLinks(MaxTraversalLinks uint64) Option { + return func(sco *options) { + sco.MaxTraversalLinks = MaxTraversalLinks + } +} + +// applyOptions applies given opts and returns the resulting options. +func applyOptions(opt ...Option) options { + opts := options{ + TraverseLinksOnlyOnce: false, // default: recurse until exhausted + MaxTraversalLinks: math.MaxInt64, // default: traverse all + } + for _, o := range opt { + o(&opts) + } + return opts +} diff --git a/ipld/car/options_test.go b/ipld/car/options_test.go new file mode 100644 index 000000000..250c67203 --- /dev/null +++ b/ipld/car/options_test.go @@ -0,0 +1,27 @@ +package car + +import ( + "math" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestApplyOptions_SetsExpectedDefaults(t *testing.T) { + require.Equal(t, options{ + MaxTraversalLinks: math.MaxInt64, + TraverseLinksOnlyOnce: false, + }, applyOptions()) +} + +func TestApplyOptions_AppliesOptions(t *testing.T) { + require.Equal(t, + options{ + MaxTraversalLinks: 123, + TraverseLinksOnlyOnce: true, + }, + applyOptions( + MaxTraversalLinks(123), + TraverseLinksOnlyOnce(), + )) +} diff --git a/ipld/car/selectivecar.go b/ipld/car/selectivecar.go index 50a955206..9b5bd8cef 100644 --- a/ipld/car/selectivecar.go +++ b/ipld/car/selectivecar.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "math" cid "github.com/ipfs/go-cid" util "github.com/ipld/go-car/util" @@ -40,6 +41,7 @@ type SelectiveCar struct { ctx context.Context dags []Dag store ReadStore + opts options } // OnCarHeaderFunc is called during traversal when the header is created @@ -61,16 +63,16 @@ type SelectiveCarPrepared struct { // NewSelectiveCar creates a new SelectiveCar for the given car file based // a block store and set of root+selector pairs -func NewSelectiveCar(ctx context.Context, store ReadStore, dags []Dag) SelectiveCar { +func NewSelectiveCar(ctx context.Context, store ReadStore, dags []Dag, opts ...Option) SelectiveCar { return SelectiveCar{ ctx: ctx, store: store, dags: dags, + opts: applyOptions(opts...), } } func (sc SelectiveCar) traverse(onCarHeader OnCarHeaderFunc, onNewCarBlock OnNewCarBlockFunc) (uint64, error) { - traverser := &selectiveCarTraverser{onCarHeader, onNewCarBlock, 0, cid.NewSet(), sc, cidlink.DefaultLinkSystem()} traverser.lsys.StorageReadOpener = traverser.loader return traverser.traverse() @@ -264,13 +266,21 @@ func (sct *selectiveCarTraverser) traverseBlocks() error { if err != nil { return err } - err = traversal.Progress{ + prog := traversal.Progress{ Cfg: &traversal.Config{ Ctx: sct.sc.ctx, LinkSystem: sct.lsys, LinkTargetNodePrototypeChooser: nsc, + LinkVisitOnlyOnce: sct.sc.opts.TraverseLinksOnlyOnce, }, - }.WalkAdv(nd, parsed, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil }) + } + if sct.sc.opts.MaxTraversalLinks < math.MaxInt64 { + prog.Budget = &traversal.Budget{ + NodeBudget: math.MaxInt64, + LinkBudget: int64(sct.sc.opts.MaxTraversalLinks), + } + } + err = prog.WalkAdv(nd, parsed, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil }) if err != nil { return err } diff --git a/ipld/car/selectivecar_test.go b/ipld/car/selectivecar_test.go new file mode 100644 index 000000000..387203ff8 --- /dev/null +++ b/ipld/car/selectivecar_test.go @@ -0,0 +1,227 @@ +package car_test + +import ( + "bytes" + "context" + "testing" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" + dstest "github.com/ipfs/go-merkledag/test" + car "github.com/ipld/go-car" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" + "github.com/stretchr/testify/require" +) + +func TestRoundtripSelective(t *testing.T) { + sourceBserv := dstest.Bserv() + sourceBs := sourceBserv.Blockstore() + dserv := merkledag.NewDAGService(sourceBserv) + a := merkledag.NewRawNode([]byte("aaaa")) + b := merkledag.NewRawNode([]byte("bbbb")) + c := merkledag.NewRawNode([]byte("cccc")) + + nd1 := &merkledag.ProtoNode{} + nd1.AddNodeLink("cat", a) + + nd2 := &merkledag.ProtoNode{} + nd2.AddNodeLink("first", nd1) + nd2.AddNodeLink("dog", b) + nd2.AddNodeLink("repeat", nd1) + + nd3 := &merkledag.ProtoNode{} + nd3.AddNodeLink("second", nd2) + nd3.AddNodeLink("bear", c) + + assertAddNodes(t, dserv, a, b, c, nd1, nd2, nd3) + + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) + + // the graph assembled above looks as follows, in order: + // nd3 -> [c, nd2 -> [nd1 -> a, b, nd1 -> a]] + // this selector starts at n3, and traverses a link at index 1 (nd2, the second link, zero indexed) + // it then recursively traverses all of its children + // the only node skipped is 'c' -- link at index 0 immediately below nd3 + // the purpose is simply to show we are not writing the entire merkledag underneath + // nd3 + selector := ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) { + efsb.Insert("Links", + ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())))) + }).Node() + + sc := car.NewSelectiveCar(context.Background(), sourceBs, []car.Dag{{Root: nd3.Cid(), Selector: selector}}) + + // write car in one step + buf := new(bytes.Buffer) + blockCount := 0 + var oneStepBlocks []car.Block + err := sc.Write(buf, func(block car.Block) error { + oneStepBlocks = append(oneStepBlocks, block) + blockCount++ + return nil + }) + require.Equal(t, blockCount, 5) + require.NoError(t, err) + + // create a new builder for two-step write + sc2 := car.NewSelectiveCar(context.Background(), sourceBs, []car.Dag{{Root: nd3.Cid(), Selector: selector}}) + + // write car in two steps + var twoStepBlocks []car.Block + scp, err := sc2.Prepare(func(block car.Block) error { + twoStepBlocks = append(twoStepBlocks, block) + return nil + }) + require.NoError(t, err) + buf2 := new(bytes.Buffer) + err = scp.Dump(buf2) + require.NoError(t, err) + + // verify preparation step correctly assesed length and blocks + require.Equal(t, scp.Size(), uint64(buf.Len())) + require.Equal(t, len(scp.Cids()), blockCount) + + // verify equal data written by both methods + require.Equal(t, buf.Bytes(), buf2.Bytes()) + + // verify equal blocks were passed to user block hook funcs + require.Equal(t, oneStepBlocks, twoStepBlocks) + + // readout car and verify contents + bserv := dstest.Bserv() + ch, err := car.LoadCar(bserv.Blockstore(), buf) + require.NoError(t, err) + require.Equal(t, len(ch.Roots), 1) + + require.True(t, ch.Roots[0].Equals(nd3.Cid())) + + bs := bserv.Blockstore() + for _, nd := range []format.Node{a, b, nd1, nd2, nd3} { + has, err := bs.Has(nd.Cid()) + require.NoError(t, err) + require.True(t, has) + } + + for _, nd := range []format.Node{c} { + has, err := bs.Has(nd.Cid()) + require.NoError(t, err) + require.False(t, has) + } +} + +func TestNoLinkRepeatSelective(t *testing.T) { + sourceBserv := dstest.Bserv() + sourceBs := countingReadStore{bs: sourceBserv.Blockstore()} + dserv := merkledag.NewDAGService(sourceBserv) + a := merkledag.NewRawNode([]byte("aaaa")) + b := merkledag.NewRawNode([]byte("bbbb")) + c := merkledag.NewRawNode([]byte("cccc")) + + nd1 := &merkledag.ProtoNode{} + nd1.AddNodeLink("cat", a) + + nd2 := &merkledag.ProtoNode{} + nd2.AddNodeLink("first", nd1) + nd2.AddNodeLink("dog", b) + nd2.AddNodeLink("repeat", nd1) + + nd3 := &merkledag.ProtoNode{} + nd3.AddNodeLink("second", nd2) + nd3.AddNodeLink("bear", c) + nd3.AddNodeLink("bearagain1", c) + nd3.AddNodeLink("bearagain2", c) + nd3.AddNodeLink("bearagain3", c) + + assertAddNodes(t, dserv, a, b, c, nd1, nd2, nd3) + + t.Run("TraverseLinksOnlyOnce off", func(t *testing.T) { + sourceBs.count = 0 + sc := car.NewSelectiveCar(context.Background(), + &sourceBs, + []car.Dag{{Root: nd3.Cid(), Selector: selectorparse.CommonSelector_ExploreAllRecursively}}, + ) + + buf := new(bytes.Buffer) + blockCount := 0 + err := sc.Write(buf, func(block car.Block) error { + blockCount++ + return nil + }) + require.Equal(t, blockCount, 6) + require.Equal(t, sourceBs.count, 11) // with TraverseLinksOnlyOnce off, we expect repeat block visits because our DAG has repeat links + require.NoError(t, err) + }) + + t.Run("TraverseLinksOnlyOnce on", func(t *testing.T) { + sourceBs.count = 0 + + sc := car.NewSelectiveCar(context.Background(), + &sourceBs, + []car.Dag{{Root: nd3.Cid(), Selector: selectorparse.CommonSelector_ExploreAllRecursively}}, + car.TraverseLinksOnlyOnce(), + ) + + buf := new(bytes.Buffer) + blockCount := 0 + err := sc.Write(buf, func(block car.Block) error { + blockCount++ + return nil + }) + require.Equal(t, blockCount, 6) + require.Equal(t, sourceBs.count, 6) // only 6 blocks to load, no duplicate loading expected + require.NoError(t, err) + }) +} + +func TestLinkLimitSelective(t *testing.T) { + sourceBserv := dstest.Bserv() + sourceBs := sourceBserv.Blockstore() + dserv := merkledag.NewDAGService(sourceBserv) + a := merkledag.NewRawNode([]byte("aaaa")) + b := merkledag.NewRawNode([]byte("bbbb")) + c := merkledag.NewRawNode([]byte("cccc")) + + nd1 := &merkledag.ProtoNode{} + nd1.AddNodeLink("cat", a) + + nd2 := &merkledag.ProtoNode{} + nd2.AddNodeLink("first", nd1) + nd2.AddNodeLink("dog", b) + nd2.AddNodeLink("repeat", nd1) + + nd3 := &merkledag.ProtoNode{} + nd3.AddNodeLink("second", nd2) + nd3.AddNodeLink("bear", c) + + assertAddNodes(t, dserv, a, b, c, nd1, nd2, nd3) + + sc := car.NewSelectiveCar(context.Background(), + sourceBs, + []car.Dag{{Root: nd3.Cid(), Selector: selectorparse.CommonSelector_ExploreAllRecursively}}, + car.MaxTraversalLinks(2)) + + buf := new(bytes.Buffer) + blockCount := 0 + err := sc.Write(buf, func(block car.Block) error { + blockCount++ + return nil + }) + require.Equal(t, blockCount, 3) // root + 2 + require.Error(t, err) + require.Regexp(t, "^traversal budget exceeded: budget for links reached zero while on path .*", err) +} + +type countingReadStore struct { + bs car.ReadStore + count int +} + +func (rs *countingReadStore) Get(c cid.Cid) (blocks.Block, error) { + rs.count++ + return rs.bs.Get(c) +}