diff --git a/pinning/pinner/dspinner/pin_test.go b/pinning/pinner/dspinner/pin_test.go index 40e2c70ca..46a2f94a5 100644 --- a/pinning/pinner/dspinner/pin_test.go +++ b/pinning/pinner/dspinner/pin_test.go @@ -18,7 +18,6 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" offline "github.com/ipfs/go-ipfs-exchange-offline" ipfspin "github.com/ipfs/go-ipfs-pinner" - "github.com/ipfs/go-ipfs-pinner/ipldpinner" util "github.com/ipfs/go-ipfs-util" ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" @@ -951,19 +950,11 @@ func BenchmarkNthPin(b *testing.B) { if err != nil { panic(err.Error()) } - pinnerIPLD, err := ipldpinner.New(dstore, dserv, dserv) - if err != nil { - panic(err.Error()) - } for count := 1000; count <= 10000; count += 1000 { b.Run(fmt.Sprint("PinDS-", count), func(b *testing.B) { benchmarkNthPin(b, count, pinner, dserv) }) - - b.Run(fmt.Sprint("PinIPLD-", count), func(b *testing.B) { - benchmarkNthPin(b, count, pinnerIPLD, dserv) - }) } } @@ -1011,15 +1002,6 @@ func BenchmarkNPins(b *testing.B) { } benchmarkNPins(b, count, pinner, dserv) }) - - b.Run(fmt.Sprint("PinIPLD-", count), func(b *testing.B) { - dstore, dserv := makeStore() - pinner, err := ipldpinner.New(dstore, dserv, dserv) - if err != nil { - panic(err.Error()) - } - benchmarkNPins(b, count, pinner, dserv) - }) } } @@ -1061,15 +1043,6 @@ func BenchmarkNUnpins(b *testing.B) { } benchmarkNUnpins(b, count, pinner, dserv) }) - - b.Run(fmt.Sprint("UninIPLD-", count), func(b *testing.B) { - dstore, dserv := makeStore() - pinner, err := ipldpinner.New(dstore, dserv, dserv) - if err != nil { - panic(err.Error()) - } - benchmarkNUnpins(b, count, pinner, dserv) - }) } } @@ -1111,15 +1084,6 @@ func BenchmarkPinAll(b *testing.B) { } benchmarkPinAll(b, count, pinner, dserv) }) - - b.Run(fmt.Sprint("PinAllIPLD-", count), func(b *testing.B) { - dstore, dserv := makeStore() - pinner, err := ipldpinner.New(dstore, dserv, dserv) - if err != nil { - panic(err.Error()) - } - benchmarkPinAll(b, count, pinner, dserv) - }) } } diff --git a/pinning/pinner/ipldpinner/pin.go b/pinning/pinner/ipldpinner/pin.go deleted file mode 100644 index 562083698..000000000 --- a/pinning/pinner/ipldpinner/pin.go +++ /dev/null @@ -1,549 +0,0 @@ -// Package ipldpinner implements structures and methods to keep track of -// which objects a user wants to keep stored locally. This implementation -// stores pin information in a mdag structure. -package ipldpinner - -import ( - "context" - "fmt" - "os" - "sync" - "time" - - cid "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - ipld "github.com/ipfs/go-ipld-format" - logging "github.com/ipfs/go-log" - "github.com/ipfs/go-merkledag" - mdag "github.com/ipfs/go-merkledag" - "github.com/ipfs/go-merkledag/dagutils" - - ipfspinner "github.com/ipfs/go-ipfs-pinner" -) - -const loadTimeout = 5 * time.Second - -var log = logging.Logger("pin") - -var pinDatastoreKey = ds.NewKey("/local/pins") - -var emptyKey cid.Cid - -var linkDirect, linkRecursive, linkInternal string - -func init() { - e, err := cid.Decode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n") - if err != nil { - log.Error("failed to decode empty key constant") - os.Exit(1) - } - emptyKey = e - - directStr, ok := ipfspinner.ModeToString(ipfspinner.Direct) - if !ok { - panic("could not find Direct pin enum") - } - linkDirect = directStr - - recursiveStr, ok := ipfspinner.ModeToString(ipfspinner.Recursive) - if !ok { - panic("could not find Recursive pin enum") - } - linkRecursive = recursiveStr - - internalStr, ok := ipfspinner.ModeToString(ipfspinner.Internal) - if !ok { - panic("could not find Internal pin enum") - } - linkInternal = internalStr -} - -// pinner implements the Pinner interface -type pinner struct { - lock sync.RWMutex - recursePin *cid.Set - directPin *cid.Set - - // Track the keys used for storing the pinning state, so gc does - // not delete them. - internalPin *cid.Set - dserv ipld.DAGService - internal ipld.DAGService // dagservice used to store internal objects - dstore ds.Datastore -} - -var _ ipfspinner.Pinner = (*pinner)(nil) - -type syncDAGService interface { - ipld.DAGService - Sync() error -} - -// New creates a new pinner using the given datastore as a backend, and loads -// the pinner's keysets from the datastore -func New(dstore ds.Datastore, dserv, internal ipld.DAGService) (*pinner, error) { - rootKey, err := dstore.Get(pinDatastoreKey) - if err != nil { - if err == ds.ErrNotFound { - return &pinner{ - recursePin: cid.NewSet(), - directPin: cid.NewSet(), - internalPin: cid.NewSet(), - dserv: dserv, - internal: internal, - dstore: dstore, - }, nil - } - return nil, err - } - rootCid, err := cid.Cast(rootKey) - if err != nil { - return nil, err - } - - ctx, cancel := context.WithTimeout(context.TODO(), loadTimeout) - defer cancel() - - root, err := internal.Get(ctx, rootCid) - if err != nil { - return nil, fmt.Errorf("cannot find pinning root object: %v", err) - } - - rootpb, ok := root.(*mdag.ProtoNode) - if !ok { - return nil, mdag.ErrNotProtobuf - } - - internalset := cid.NewSet() - internalset.Add(rootCid) - recordInternal := internalset.Add - - // load recursive set - recurseKeys, err := loadSet(ctx, internal, rootpb, linkRecursive, recordInternal) - if err != nil { - return nil, fmt.Errorf("cannot load recursive pins: %v", err) - } - - // load direct set - directKeys, err := loadSet(ctx, internal, rootpb, linkDirect, recordInternal) - if err != nil { - return nil, fmt.Errorf("cannot load direct pins: %v", err) - } - - return &pinner{ - // assign pinsets - recursePin: cidSetWithValues(recurseKeys), - directPin: cidSetWithValues(directKeys), - internalPin: internalset, - // assign services - dserv: dserv, - dstore: dstore, - internal: internal, - }, nil -} - -// LoadKeys reads the pinned CIDs and sends them on the given channel. This is -// used to read pins without loading them all into memory. -func LoadKeys(ctx context.Context, dstore ds.Datastore, dserv, internal ipld.DAGService, recursive bool, keyChan chan<- cid.Cid) error { - rootKey, err := dstore.Get(pinDatastoreKey) - if err != nil { - if err == ds.ErrNotFound { - return nil - } - return err - } - rootCid, err := cid.Cast(rootKey) - if err != nil { - return err - } - - root, err := internal.Get(ctx, rootCid) - if err != nil { - return fmt.Errorf("cannot find pinning root object: %v", err) - } - - rootpb, ok := root.(*mdag.ProtoNode) - if !ok { - return mdag.ErrNotProtobuf - } - - var linkName string - if recursive { - linkName = linkRecursive - } else { - linkName = linkDirect - } - - return loadSetChan(ctx, internal, rootpb, linkName, keyChan) -} - -// Pin the given node, optionally recursive -func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { - err := p.dserv.Add(ctx, node) - if err != nil { - return err - } - - c := node.Cid() - - p.lock.Lock() - defer p.lock.Unlock() - - if recurse { - if p.recursePin.Has(c) { - return nil - } - - p.lock.Unlock() - // temporary unlock to fetch the entire graph - err := mdag.FetchGraph(ctx, c, p.dserv) - p.lock.Lock() - if err != nil { - return err - } - - if p.recursePin.Has(c) { - return nil - } - - if p.directPin.Has(c) { - p.directPin.Remove(c) - } - - p.recursePin.Add(c) - } else { - if p.recursePin.Has(c) { - return fmt.Errorf("%s already pinned recursively", c.String()) - } - - p.directPin.Add(c) - } - return nil -} - -// ErrNotPinned is returned when trying to unpin items which are not pinned. -var ErrNotPinned = fmt.Errorf("not pinned or pinned indirectly") - -// Unpin a given key -func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error { - p.lock.Lock() - defer p.lock.Unlock() - if p.recursePin.Has(c) { - if !recursive { - return fmt.Errorf("%s is pinned recursively", c) - } - p.recursePin.Remove(c) - return nil - } - if p.directPin.Has(c) { - p.directPin.Remove(c) - return nil - } - return ErrNotPinned -} - -func (p *pinner) isInternalPin(c cid.Cid) bool { - return p.internalPin.Has(c) -} - -// IsPinned returns whether or not the given key is pinned -// and an explanation of why its pinned -func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) { - p.lock.RLock() - defer p.lock.RUnlock() - return p.isPinnedWithType(ctx, c, ipfspinner.Any) -} - -// IsPinnedWithType returns whether or not the given cid is pinned with the -// given pin type, as well as returning the type of pin its pinned with. -func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) (string, bool, error) { - p.lock.RLock() - defer p.lock.RUnlock() - return p.isPinnedWithType(ctx, c, mode) -} - -// isPinnedWithType is the implementation of IsPinnedWithType that does not lock. -// intended for use by other pinned methods that already take locks -func (p *pinner) isPinnedWithType(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) (string, bool, error) { - switch mode { - case ipfspinner.Any, ipfspinner.Direct, ipfspinner.Indirect, ipfspinner.Recursive, ipfspinner.Internal: - default: - err := fmt.Errorf("invalid Pin Mode '%d', must be one of {%d, %d, %d, %d, %d}", - mode, ipfspinner.Direct, ipfspinner.Indirect, ipfspinner.Recursive, ipfspinner.Internal, ipfspinner.Any) - return "", false, err - } - if (mode == ipfspinner.Recursive || mode == ipfspinner.Any) && p.recursePin.Has(c) { - return linkRecursive, true, nil - } - if mode == ipfspinner.Recursive { - return "", false, nil - } - - if (mode == ipfspinner.Direct || mode == ipfspinner.Any) && p.directPin.Has(c) { - return linkDirect, true, nil - } - if mode == ipfspinner.Direct { - return "", false, nil - } - - if (mode == ipfspinner.Internal || mode == ipfspinner.Any) && p.isInternalPin(c) { - return linkInternal, true, nil - } - if mode == ipfspinner.Internal { - return "", false, nil - } - - // Default is Indirect - visitedSet := cid.NewSet() - for _, rc := range p.recursePin.Keys() { - has, err := hasChild(ctx, p.dserv, rc, c, visitedSet.Visit) - if err != nil { - return "", false, err - } - if has { - return rc.String(), true, nil - } - } - return "", false, nil -} - -// CheckIfPinned Checks if a set of keys are pinned, more efficient than -// calling IsPinned for each key, returns the pinned status of cid(s) -func (p *pinner) CheckIfPinned(ctx context.Context, cids ...cid.Cid) ([]ipfspinner.Pinned, error) { - p.lock.RLock() - defer p.lock.RUnlock() - pinned := make([]ipfspinner.Pinned, 0, len(cids)) - toCheck := cid.NewSet() - - // First check for non-Indirect pins directly - for _, c := range cids { - if p.recursePin.Has(c) { - pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Recursive}) - } else if p.directPin.Has(c) { - pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Direct}) - } else if p.isInternalPin(c) { - pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Internal}) - } else { - toCheck.Add(c) - } - } - - // Now walk all recursive pins to check for indirect pins - visited := cid.NewSet() - for _, rk := range p.recursePin.Keys() { - err := merkledag.Walk(ctx, merkledag.GetLinksWithDAG(p.dserv), rk, func(c cid.Cid) bool { - if toCheck.Len() == 0 || !visited.Visit(c) { - return false - } - - if toCheck.Has(c) { - pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Indirect, Via: rk}) - toCheck.Remove(c) - } - - return true - }, merkledag.Concurrent()) - if err != nil { - return nil, err - } - if toCheck.Len() == 0 { - break - } - } - - // Anything left in toCheck is not pinned - for _, k := range toCheck.Keys() { - pinned = append(pinned, ipfspinner.Pinned{Key: k, Mode: ipfspinner.NotPinned}) - } - - return pinned, nil -} - -// RemovePinWithMode is for manually editing the pin structure. -// Use with care! If used improperly, garbage collection may not -// be successful. -func (p *pinner) RemovePinWithMode(c cid.Cid, mode ipfspinner.Mode) { - p.lock.Lock() - defer p.lock.Unlock() - switch mode { - case ipfspinner.Direct: - p.directPin.Remove(c) - case ipfspinner.Recursive: - p.recursePin.Remove(c) - default: - // programmer error, panic OK - panic("unrecognized pin type") - } -} - -func cidSetWithValues(cids []cid.Cid) *cid.Set { - out := cid.NewSet() - for _, c := range cids { - out.Add(c) - } - return out -} - -// DirectKeys returns a slice containing the directly pinned keys -func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) { - p.lock.RLock() - defer p.lock.RUnlock() - - return p.directPin.Keys(), nil -} - -// RecursiveKeys returns a slice containing the recursively pinned keys -func (p *pinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) { - p.lock.RLock() - defer p.lock.RUnlock() - - return p.recursePin.Keys(), nil -} - -// Update updates a recursive pin from one cid to another -// this is more efficient than simply pinning the new one and unpinning the -// old one -func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error { - if from == to { - // Nothing to do. Don't remove this check or we'll end up - // _removing_ the pin. - // - // See #6648 - return nil - } - - p.lock.Lock() - defer p.lock.Unlock() - - if !p.recursePin.Has(from) { - return fmt.Errorf("'from' cid was not recursively pinned already") - } - - // Temporarily unlock while we fetch the differences. - p.lock.Unlock() - err := dagutils.DiffEnumerate(ctx, p.dserv, from, to) - p.lock.Lock() - - if err != nil { - return err - } - - p.recursePin.Add(to) - if unpin { - p.recursePin.Remove(from) - } - return nil -} - -// Flush encodes and writes pinner keysets to the datastore -func (p *pinner) Flush(ctx context.Context) error { - p.lock.Lock() - defer p.lock.Unlock() - - internalset := cid.NewSet() - recordInternal := internalset.Add - - root := &mdag.ProtoNode{} - { - n, err := storeSet(ctx, p.internal, p.directPin.Keys(), recordInternal) - if err != nil { - return err - } - if err := root.AddNodeLink(linkDirect, n); err != nil { - return err - } - } - - { - n, err := storeSet(ctx, p.internal, p.recursePin.Keys(), recordInternal) - if err != nil { - return err - } - if err := root.AddNodeLink(linkRecursive, n); err != nil { - return err - } - } - - // add the empty node, its referenced by the pin sets but never created - err := p.internal.Add(ctx, new(mdag.ProtoNode)) - if err != nil { - return err - } - - err = p.internal.Add(ctx, root) - if err != nil { - return err - } - - k := root.Cid() - - internalset.Add(k) - - if syncDServ, ok := p.dserv.(syncDAGService); ok { - if err := syncDServ.Sync(); err != nil { - return fmt.Errorf("cannot sync pinned data: %v", err) - } - } - - if syncInternal, ok := p.internal.(syncDAGService); ok { - if err := syncInternal.Sync(); err != nil { - return fmt.Errorf("cannot sync pinning data: %v", err) - } - } - - if err := p.dstore.Put(pinDatastoreKey, k.Bytes()); err != nil { - return fmt.Errorf("cannot store pin state: %v", err) - } - if err := p.dstore.Sync(pinDatastoreKey); err != nil { - return fmt.Errorf("cannot sync pin state: %v", err) - } - p.internalPin = internalset - return nil -} - -// InternalPins returns all cids kept pinned for the internal state of the -// pinner -func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) { - p.lock.Lock() - defer p.lock.Unlock() - return p.internalPin.Keys(), nil -} - -// PinWithMode allows the user to have fine grained control over pin -// counts -func (p *pinner) PinWithMode(c cid.Cid, mode ipfspinner.Mode) { - p.lock.Lock() - defer p.lock.Unlock() - switch mode { - case ipfspinner.Recursive: - p.recursePin.Add(c) - case ipfspinner.Direct: - p.directPin.Add(c) - } -} - -// hasChild recursively looks for a Cid among the children of a root Cid. -// The visit function can be used to shortcut already-visited branches. -func hasChild(ctx context.Context, ng ipld.NodeGetter, root cid.Cid, child cid.Cid, visit func(cid.Cid) bool) (bool, error) { - links, err := ipld.GetLinks(ctx, ng, root) - if err != nil { - return false, err - } - for _, lnk := range links { - c := lnk.Cid - if lnk.Cid.Equals(child) { - return true, nil - } - if visit(c) { - has, err := hasChild(ctx, ng, c, child, visit) - if err != nil { - return false, err - } - - if has { - return has, nil - } - } - } - return false, nil -} diff --git a/pinning/pinner/ipldpinner/pin_test.go b/pinning/pinner/ipldpinner/pin_test.go deleted file mode 100644 index 3c61d41fd..000000000 --- a/pinning/pinner/ipldpinner/pin_test.go +++ /dev/null @@ -1,526 +0,0 @@ -package ipldpinner - -import ( - "context" - "io" - "testing" - "time" - - bs "github.com/ipfs/go-blockservice" - mdag "github.com/ipfs/go-merkledag" - - cid "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - blockstore "github.com/ipfs/go-ipfs-blockstore" - offline "github.com/ipfs/go-ipfs-exchange-offline" - pin "github.com/ipfs/go-ipfs-pinner" - util "github.com/ipfs/go-ipfs-util" -) - -var rand = util.NewTimeSeededRand() - -func randNode() (*mdag.ProtoNode, cid.Cid) { - nd := new(mdag.ProtoNode) - nd.SetData(make([]byte, 32)) - _, err := io.ReadFull(rand, nd.Data()) - if err != nil { - panic(err) - } - k := nd.Cid() - return nd, k -} - -func assertPinned(t *testing.T, p pin.Pinner, c cid.Cid, failmsg string) { - _, pinned, err := p.IsPinned(context.Background(), c) - if err != nil { - t.Fatal(err) - } - - if !pinned { - t.Fatal(failmsg) - } -} - -func assertUnpinned(t *testing.T, p pin.Pinner, c cid.Cid, failmsg string) { - _, pinned, err := p.IsPinned(context.Background(), c) - if err != nil { - t.Fatal(err) - } - - if pinned { - t.Fatal(failmsg) - } -} - -func TestPinnerBasic(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dstore := dssync.MutexWrap(ds.NewMapDatastore()) - bstore := blockstore.NewBlockstore(dstore) - bserv := bs.New(bstore, offline.Exchange(bstore)) - - dserv := mdag.NewDAGService(bserv) - - p, err := New(dstore, dserv, dserv) - if err != nil { - t.Fatal(err) - } - - a, ak := randNode() - err = dserv.Add(ctx, a) - if err != nil { - t.Fatal(err) - } - - // Pin A{} - err = p.Pin(ctx, a, false) - if err != nil { - t.Fatal(err) - } - - assertPinned(t, p, ak, "Failed to find key") - - // create new node c, to be indirectly pinned through b - c, _ := randNode() - err = dserv.Add(ctx, c) - if err != nil { - t.Fatal(err) - } - ck := c.Cid() - - // Create new node b, to be parent to a and c - b, _ := randNode() - err = b.AddNodeLink("child", a) - if err != nil { - t.Fatal(err) - } - - err = b.AddNodeLink("otherchild", c) - if err != nil { - t.Fatal(err) - } - - err = dserv.Add(ctx, b) - if err != nil { - t.Fatal(err) - } - bk := b.Cid() - - // recursively pin B{A,C} - err = p.Pin(ctx, b, true) - if err != nil { - t.Fatal(err) - } - - assertPinned(t, p, ck, "child of recursively pinned node not found") - - assertPinned(t, p, bk, "Recursively pinned node not found..") - - d, _ := randNode() - _ = d.AddNodeLink("a", a) - _ = d.AddNodeLink("c", c) - - e, _ := randNode() - _ = d.AddNodeLink("e", e) - - // Must be in dagserv for unpin to work - err = dserv.Add(ctx, e) - if err != nil { - t.Fatal(err) - } - err = dserv.Add(ctx, d) - if err != nil { - t.Fatal(err) - } - - // Add D{A,C,E} - err = p.Pin(ctx, d, true) - if err != nil { - t.Fatal(err) - } - - dk := d.Cid() - assertPinned(t, p, dk, "pinned node not found.") - - // Test recursive unpin - err = p.Unpin(ctx, dk, true) - if err != nil { - t.Fatal(err) - } - - err = p.Flush(ctx) - if err != nil { - t.Fatal(err) - } - - np, err := New(dstore, dserv, dserv) - if err != nil { - t.Fatal(err) - } - - // Test directly pinned - assertPinned(t, np, ak, "Could not find pinned node!") - - // Test recursively pinned - assertPinned(t, np, bk, "could not find recursively pinned node") - - // Test that LoadKeys returns the expected CIDs. - keyChan := make(chan cid.Cid) - go func() { - err = LoadKeys(ctx, dstore, dserv, dserv, true, keyChan) - close(keyChan) - }() - keys := map[cid.Cid]struct{}{} - for c := range keyChan { - keys[c] = struct{}{} - } - if err != nil { - t.Fatal(err) - } - recKeys, _ := np.RecursiveKeys(ctx) - if len(keys) != len(recKeys) { - t.Fatal("wrong number of recursive keys from LoadKeys") - } - for _, k := range recKeys { - if _, ok := keys[k]; !ok { - t.Fatal("LoadKeys did not return correct recursive keys") - } - } - - keyChan = make(chan cid.Cid) - go func() { - err = LoadKeys(ctx, dstore, dserv, dserv, false, keyChan) - close(keyChan) - }() - keys = map[cid.Cid]struct{}{} - for c := range keyChan { - keys[c] = struct{}{} - } - if err != nil { - t.Fatal(err) - } - dirKeys, _ := np.DirectKeys(ctx) - if len(keys) != len(dirKeys) { - t.Fatal("wrong number of direct keys from LoadKeys") - } - for _, k := range dirKeys { - if _, ok := keys[k]; !ok { - t.Fatal("LoadKeys did not return correct direct keys") - } - } - - cancel() - emptyDS := dssync.MutexWrap(ds.NewMapDatastore()) - - // Check key not in datastore - err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil) - if err != nil { - t.Fatal(err) - } - - // Check error on bad key - if err = emptyDS.Put(pinDatastoreKey, []byte("bad-cid")); err != nil { - panic(err) - } - if err = emptyDS.Sync(pinDatastoreKey); err != nil { - panic(err) - } - if err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil); err == nil { - t.Fatal("expected error") - } - - // Lookup dag that does not exist - noKey, err := cid.Decode("QmYff9iHR1Hz6wufVeJodzXqQm4pkK4QNS9ms8tyPKVWm1") - if err != nil { - panic(err) - } - if err = emptyDS.Put(pinDatastoreKey, noKey.Bytes()); err != nil { - panic(err) - } - if err = emptyDS.Sync(pinDatastoreKey); err != nil { - panic(err) - } - err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil) - if err == nil || err.Error() != "cannot find pinning root object: merkledag: not found" { - t.Fatal("did not get expected error") - } - - // Check error when node has no links - if err = emptyDS.Put(pinDatastoreKey, emptyKey.Bytes()); err != nil { - panic(err) - } - if err = emptyDS.Sync(pinDatastoreKey); err != nil { - panic(err) - } - if err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil); err == nil { - t.Fatal("expected error") - } -} - -func TestIsPinnedLookup(t *testing.T) { - // We are going to test that lookups work in pins which share - // the same branches. For that we will construct this tree: - // - // A5->A4->A3->A2->A1->A0 - // / / - // B------- / - // \ / - // C--------------- - // - // We will ensure that IsPinned works for all objects both when they - // are pinned and once they have been unpinned. - aBranchLen := 6 - if aBranchLen < 3 { - t.Fatal("set aBranchLen to at least 3") - } - - ctx := context.Background() - dstore := dssync.MutexWrap(ds.NewMapDatastore()) - bstore := blockstore.NewBlockstore(dstore) - bserv := bs.New(bstore, offline.Exchange(bstore)) - - dserv := mdag.NewDAGService(bserv) - - // TODO does pinner need to share datastore with blockservice? - p, err := New(dstore, dserv, dserv) - if err != nil { - t.Fatal(err) - } - - aNodes := make([]*mdag.ProtoNode, aBranchLen) - aKeys := make([]cid.Cid, aBranchLen) - for i := 0; i < aBranchLen; i++ { - a, _ := randNode() - if i >= 1 { - err := a.AddNodeLink("child", aNodes[i-1]) - if err != nil { - t.Fatal(err) - } - } - - err := dserv.Add(ctx, a) - if err != nil { - t.Fatal(err) - } - //t.Logf("a[%d] is %s", i, ak) - aNodes[i] = a - aKeys[i] = a.Cid() - } - - // Pin A5 recursively - if err := p.Pin(ctx, aNodes[aBranchLen-1], true); err != nil { - t.Fatal(err) - } - - // Create node B and add A3 as child - b, _ := randNode() - if err := b.AddNodeLink("mychild", aNodes[3]); err != nil { - t.Fatal(err) - } - - // Create C node - c, _ := randNode() - // Add A0 as child of C - if err := c.AddNodeLink("child", aNodes[0]); err != nil { - t.Fatal(err) - } - - // Add C - err = dserv.Add(ctx, c) - if err != nil { - t.Fatal(err) - } - ck := c.Cid() - //t.Logf("C is %s", ck) - - // Add C to B and Add B - if err := b.AddNodeLink("myotherchild", c); err != nil { - t.Fatal(err) - } - err = dserv.Add(ctx, b) - if err != nil { - t.Fatal(err) - } - bk := b.Cid() - //t.Logf("B is %s", bk) - - // Pin C recursively - - if err := p.Pin(ctx, c, true); err != nil { - t.Fatal(err) - } - - // Pin B recursively - - if err := p.Pin(ctx, b, true); err != nil { - t.Fatal(err) - } - - assertPinned(t, p, aKeys[0], "A0 should be pinned") - assertPinned(t, p, aKeys[1], "A1 should be pinned") - assertPinned(t, p, ck, "C should be pinned") - assertPinned(t, p, bk, "B should be pinned") - - // Unpin A5 recursively - if err := p.Unpin(ctx, aKeys[5], true); err != nil { - t.Fatal(err) - } - - assertPinned(t, p, aKeys[0], "A0 should still be pinned through B") - assertUnpinned(t, p, aKeys[4], "A4 should be unpinned") - - // Unpin B recursively - if err := p.Unpin(ctx, bk, true); err != nil { - t.Fatal(err) - } - assertUnpinned(t, p, bk, "B should be unpinned") - assertUnpinned(t, p, aKeys[1], "A1 should be unpinned") - assertPinned(t, p, aKeys[0], "A0 should still be pinned through C") -} - -func TestDuplicateSemantics(t *testing.T) { - ctx := context.Background() - dstore := dssync.MutexWrap(ds.NewMapDatastore()) - bstore := blockstore.NewBlockstore(dstore) - bserv := bs.New(bstore, offline.Exchange(bstore)) - - dserv := mdag.NewDAGService(bserv) - - p, err := New(dstore, dserv, dserv) - if err != nil { - t.Fatal(err) - } - - a, _ := randNode() - err = dserv.Add(ctx, a) - if err != nil { - t.Fatal(err) - } - - // pin is recursively - err = p.Pin(ctx, a, true) - if err != nil { - t.Fatal(err) - } - - // pinning directly should fail - err = p.Pin(ctx, a, false) - if err == nil { - t.Fatal("expected direct pin to fail") - } - - // pinning recursively again should succeed - err = p.Pin(ctx, a, true) - if err != nil { - t.Fatal(err) - } -} - -func TestFlush(t *testing.T) { - dstore := dssync.MutexWrap(ds.NewMapDatastore()) - bstore := blockstore.NewBlockstore(dstore) - bserv := bs.New(bstore, offline.Exchange(bstore)) - - dserv := mdag.NewDAGService(bserv) - p, err := New(dstore, dserv, dserv) - if err != nil { - t.Fatal(err) - } - _, k := randNode() - - p.PinWithMode(k, pin.Recursive) - if err := p.Flush(context.Background()); err != nil { - t.Fatal(err) - } - assertPinned(t, p, k, "expected key to still be pinned") -} - -func TestPinRecursiveFail(t *testing.T) { - ctx := context.Background() - dstore := dssync.MutexWrap(ds.NewMapDatastore()) - bstore := blockstore.NewBlockstore(dstore) - bserv := bs.New(bstore, offline.Exchange(bstore)) - dserv := mdag.NewDAGService(bserv) - - p, err := New(dstore, dserv, dserv) - if err != nil { - t.Fatal(err) - } - - a, _ := randNode() - b, _ := randNode() - err = a.AddNodeLink("child", b) - if err != nil { - t.Fatal(err) - } - - // NOTE: This isnt a time based test, we expect the pin to fail - mctx, cancel := context.WithTimeout(ctx, time.Millisecond) - defer cancel() - - err = p.Pin(mctx, a, true) - if err == nil { - t.Fatal("should have failed to pin here") - } - - err = dserv.Add(ctx, b) - if err != nil { - t.Fatal(err) - } - - err = dserv.Add(ctx, a) - if err != nil { - t.Fatal(err) - } - - // this one is time based... but shouldnt cause any issues - mctx, cancel = context.WithTimeout(ctx, time.Second) - defer cancel() - err = p.Pin(mctx, a, true) - if err != nil { - t.Fatal(err) - } -} - -func TestPinUpdate(t *testing.T) { - ctx := context.Background() - - dstore := dssync.MutexWrap(ds.NewMapDatastore()) - bstore := blockstore.NewBlockstore(dstore) - bserv := bs.New(bstore, offline.Exchange(bstore)) - - dserv := mdag.NewDAGService(bserv) - p, err := New(dstore, dserv, dserv) - if err != nil { - t.Fatal(err) - } - n1, c1 := randNode() - n2, c2 := randNode() - - if err := dserv.Add(ctx, n1); err != nil { - t.Fatal(err) - } - if err := dserv.Add(ctx, n2); err != nil { - t.Fatal(err) - } - - if err := p.Pin(ctx, n1, true); err != nil { - t.Fatal(err) - } - - if err := p.Update(ctx, c1, c2, true); err != nil { - t.Fatal(err) - } - - assertPinned(t, p, c2, "c2 should be pinned now") - assertUnpinned(t, p, c1, "c1 should no longer be pinned") - - if err := p.Update(ctx, c2, c1, false); err != nil { - t.Fatal(err) - } - - assertPinned(t, p, c2, "c2 should be pinned still") - assertPinned(t, p, c1, "c1 should be pinned now") -} diff --git a/pinning/pinner/ipldpinner/set.go b/pinning/pinner/ipldpinner/set.go deleted file mode 100644 index 51951a2c0..000000000 --- a/pinning/pinner/ipldpinner/set.go +++ /dev/null @@ -1,334 +0,0 @@ -package ipldpinner - -import ( - "bytes" - "context" - "encoding/binary" - "errors" - "fmt" - "hash/fnv" - "sort" - - "github.com/gogo/protobuf/proto" - cid "github.com/ipfs/go-cid" - ipld "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" - - "github.com/ipfs/go-ipfs-pinner/internal/pb" -) - -const ( - // defaultFanout specifies the default number of fan-out links per layer - defaultFanout = 256 - - // maxItems is the maximum number of items that will fit in a single bucket - maxItems = 8192 -) - -func hash(seed uint32, c cid.Cid) uint32 { - var buf [4]byte - binary.LittleEndian.PutUint32(buf[:], seed) - h := fnv.New32a() - _, _ = h.Write(buf[:]) - _, _ = h.Write(c.Bytes()) - return h.Sum32() -} - -type itemIterator func() (c cid.Cid, ok bool) - -type keyObserver func(cid.Cid) - -type sortByHash struct { - links []*ipld.Link -} - -func (s sortByHash) Len() int { - return len(s.links) -} - -func (s sortByHash) Less(a, b int) bool { - return bytes.Compare(s.links[a].Cid.Bytes(), s.links[b].Cid.Bytes()) == -1 -} - -func (s sortByHash) Swap(a, b int) { - s.links[a], s.links[b] = s.links[b], s.links[a] -} - -func storeItems(ctx context.Context, dag ipld.DAGService, estimatedLen uint64, depth uint32, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) { - // Each node wastes up to defaultFanout in empty links. - var leafLinks uint64 - if estimatedLen < maxItems { - leafLinks = estimatedLen - } - links := make([]*ipld.Link, defaultFanout, defaultFanout+leafLinks) - for i := 0; i < defaultFanout; i++ { - links[i] = &ipld.Link{Cid: emptyKey} - } - - // add emptyKey to our set of internal pinset objects - n := &merkledag.ProtoNode{} - n.SetLinks(links) - - internalKeys(emptyKey) - - hdr := &pb.Set{ - Version: 1, - Fanout: defaultFanout, - Seed: depth, - } - if err := writeHdr(n, hdr); err != nil { - return nil, err - } - - if estimatedLen < maxItems { - // it'll probably fit - links := n.Links() - for i := 0; i < maxItems; i++ { - k, ok := iter() - if !ok { - // all done - break - } - - links = append(links, &ipld.Link{Cid: k}) - } - - n.SetLinks(links) - - // sort by hash, also swap item Data - s := sortByHash{ - links: n.Links()[defaultFanout:], - } - sort.Stable(s) - } - - var hashed [][]cid.Cid - for { - // This loop essentially enumerates every single item in the set - // and maps them all into a set of buckets. Each bucket will be recursively - // turned into its own sub-set, and so on down the chain. Each sub-set - // gets added to the dagservice, and put into its place in a set nodes - // links array. - // - // Previously, the bucket was selected by taking an int32 from the hash of - // the input key + seed. This was erroneous as we would later be assigning - // the created sub-sets into an array of length 256 by the modulus of the - // int32 hash value with 256. This resulted in overwriting existing sub-sets - // and losing pins. The fix (a few lines down from this comment), is to - // map the hash value down to the 8 bit keyspace here while creating the - // buckets. This way, we avoid any overlapping later on. - k, ok := iter() - if !ok { - break - } - if hashed == nil { - hashed = make([][]cid.Cid, defaultFanout) - } - h := hash(depth, k) % defaultFanout - hashed[h] = append(hashed[h], k) - } - - for h, items := range hashed { - if len(items) == 0 { - // recursion base case - continue - } - - childIter := getCidListIterator(items) - - // recursively create a pinset from the items for this bucket index - child, err := storeItems(ctx, dag, uint64(len(items)), depth+1, childIter, internalKeys) - if err != nil { - return nil, err - } - - size, err := child.Size() - if err != nil { - return nil, err - } - - err = dag.Add(ctx, child) - if err != nil { - return nil, err - } - childKey := child.Cid() - - internalKeys(childKey) - - // overwrite the 'empty key' in the existing links array - n.Links()[h] = &ipld.Link{ - Cid: childKey, - Size: size, - } - } - return n, nil -} - -func readHdr(n *merkledag.ProtoNode) (*pb.Set, error) { - hdrLenRaw, consumed := binary.Uvarint(n.Data()) - if consumed <= 0 { - return nil, errors.New("invalid Set header length") - } - - pbdata := n.Data()[consumed:] - if hdrLenRaw > uint64(len(pbdata)) { - return nil, errors.New("impossibly large Set header length") - } - // as hdrLenRaw was <= an int, we now know it fits in an int - hdrLen := int(hdrLenRaw) - var hdr pb.Set - if err := proto.Unmarshal(pbdata[:hdrLen], &hdr); err != nil { - return nil, err - } - - if v := hdr.GetVersion(); v != 1 { - return nil, fmt.Errorf("unsupported Set version: %d", v) - } - if uint64(hdr.GetFanout()) > uint64(len(n.Links())) { - return nil, errors.New("impossibly large Fanout") - } - return &hdr, nil -} - -func writeHdr(n *merkledag.ProtoNode, hdr *pb.Set) error { - hdrData, err := proto.Marshal(hdr) - if err != nil { - return err - } - - // make enough space for the length prefix and the marshaled header data - data := make([]byte, binary.MaxVarintLen64, binary.MaxVarintLen64+len(hdrData)) - - // write the uvarint length of the header data - uvarlen := binary.PutUvarint(data, uint64(len(hdrData))) - - // append the actual protobuf data *after* the length value we wrote - data = append(data[:uvarlen], hdrData...) - - n.SetData(data) - return nil -} - -type walkerFunc func(idx int, link *ipld.Link) error - -func walkItems(ctx context.Context, dag ipld.DAGService, n *merkledag.ProtoNode, fn walkerFunc, children keyObserver) error { - hdr, err := readHdr(n) - if err != nil { - return err - } - // readHdr guarantees fanout is a safe value - fanout := hdr.GetFanout() - for i, l := range n.Links()[fanout:] { - if err = fn(i, l); err != nil { - return err - } - } - for _, l := range n.Links()[:fanout] { - c := l.Cid - if children != nil { - children(c) - } - if c.Equals(emptyKey) { - continue - } - subtree, err := l.GetNode(ctx, dag) - if err != nil { - return err - } - - stpb, ok := subtree.(*merkledag.ProtoNode) - if !ok { - return merkledag.ErrNotProtobuf - } - - if err = walkItems(ctx, dag, stpb, fn, children); err != nil { - return err - } - } - return nil -} - -func loadSet(ctx context.Context, dag ipld.DAGService, root *merkledag.ProtoNode, name string, internalKeys keyObserver) ([]cid.Cid, error) { - l, err := root.GetNodeLink(name) - if err != nil { - return nil, err - } - - lnkc := l.Cid - internalKeys(lnkc) - - n, err := l.GetNode(ctx, dag) - if err != nil { - return nil, err - } - - pbn, ok := n.(*merkledag.ProtoNode) - if !ok { - return nil, merkledag.ErrNotProtobuf - } - - var res []cid.Cid - walk := func(idx int, link *ipld.Link) error { - res = append(res, link.Cid) - return nil - } - - if err := walkItems(ctx, dag, pbn, walk, internalKeys); err != nil { - return nil, err - } - return res, nil -} - -func loadSetChan(ctx context.Context, dag ipld.DAGService, root *merkledag.ProtoNode, name string, keyChan chan<- cid.Cid) error { - l, err := root.GetNodeLink(name) - if err != nil { - return err - } - - n, err := l.GetNode(ctx, dag) - if err != nil { - return err - } - - pbn, ok := n.(*merkledag.ProtoNode) - if !ok { - return merkledag.ErrNotProtobuf - } - - walk := func(idx int, link *ipld.Link) error { - keyChan <- link.Cid - return nil - } - - if err = walkItems(ctx, dag, pbn, walk, nil); err != nil { - return err - } - return nil -} - -func getCidListIterator(cids []cid.Cid) itemIterator { - return func() (c cid.Cid, ok bool) { - if len(cids) == 0 { - return cid.Cid{}, false - } - - first := cids[0] - cids = cids[1:] - return first, true - } -} - -func storeSet(ctx context.Context, dag ipld.DAGService, cids []cid.Cid, internalKeys keyObserver) (*merkledag.ProtoNode, error) { - iter := getCidListIterator(cids) - - n, err := storeItems(ctx, dag, uint64(len(cids)), 0, iter, internalKeys) - if err != nil { - return nil, err - } - err = dag.Add(ctx, n) - if err != nil { - return nil, err - } - internalKeys(n.Cid()) - return n, nil -} diff --git a/pinning/pinner/ipldpinner/set_test.go b/pinning/pinner/ipldpinner/set_test.go deleted file mode 100644 index 0f32e6b5e..000000000 --- a/pinning/pinner/ipldpinner/set_test.go +++ /dev/null @@ -1,100 +0,0 @@ -package ipldpinner - -import ( - "context" - "encoding/binary" - "testing" - - bserv "github.com/ipfs/go-blockservice" - cid "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - dsq "github.com/ipfs/go-datastore/query" - blockstore "github.com/ipfs/go-ipfs-blockstore" - offline "github.com/ipfs/go-ipfs-exchange-offline" - dag "github.com/ipfs/go-merkledag" -) - -func ignoreCids(_ cid.Cid) {} - -func objCount(d ds.Datastore) int { - q := dsq.Query{KeysOnly: true} - res, err := d.Query(q) - if err != nil { - panic(err) - } - - var count int - for { - _, ok := res.NextSync() - if !ok { - break - } - - count++ - } - return count -} - -func TestSet(t *testing.T) { - dst := ds.NewMapDatastore() - bstore := blockstore.NewBlockstore(dst) - ds := dag.NewDAGService(bserv.New(bstore, offline.Exchange(bstore))) - - // this value triggers the creation of a recursive shard. - // If the recursive sharding is done improperly, this will result in - // an infinite recursion and crash (OOM) - limit := uint32((defaultFanout * maxItems) + 1) - - var inputs []cid.Cid - buf := make([]byte, 4) - for i := uint32(0); i < limit; i++ { - binary.BigEndian.PutUint32(buf, i) - c := dag.NewRawNode(buf).Cid() - inputs = append(inputs, c) - } - - _, err := storeSet(context.Background(), ds, inputs[:len(inputs)-1], ignoreCids) - if err != nil { - t.Fatal(err) - } - - objs1 := objCount(dst) - - out, err := storeSet(context.Background(), ds, inputs, ignoreCids) - if err != nil { - t.Fatal(err) - } - - objs2 := objCount(dst) - if objs2-objs1 > 2 { - t.Fatal("set sharding does not appear to be deterministic") - } - - // weird wrapper node because loadSet expects us to pass an - // object pointing to multiple named sets - setroot := &dag.ProtoNode{} - err = setroot.AddNodeLink("foo", out) - if err != nil { - t.Fatal(err) - } - - outset, err := loadSet(context.Background(), ds, setroot, "foo", ignoreCids) - if err != nil { - t.Fatal(err) - } - - if uint32(len(outset)) != limit { - t.Fatal("got wrong number", len(outset), limit) - } - - seen := cid.NewSet() - for _, c := range outset { - seen.Add(c) - } - - for _, c := range inputs { - if !seen.Has(c) { - t.Fatalf("expected to have '%s', didnt find it", c) - } - } -} diff --git a/pinning/pinner/pinconv/pinconv.go b/pinning/pinner/pinconv/pinconv.go deleted file mode 100644 index df21f85b0..000000000 --- a/pinning/pinner/pinconv/pinconv.go +++ /dev/null @@ -1,127 +0,0 @@ -// Package pinconv converts pins between the dag-based ipldpinner and the -// datastore-based dspinner. Once conversion is complete, the pins from the -// source pinner are removed. -package pinconv - -import ( - "context" - "fmt" - - cid "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - ipfspinner "github.com/ipfs/go-ipfs-pinner" - "github.com/ipfs/go-ipfs-pinner/dspinner" - "github.com/ipfs/go-ipfs-pinner/ipldpinner" - ipld "github.com/ipfs/go-ipld-format" -) - -// ConvertPinsFromIPLDToDS converts pins stored in mdag based storage to pins -// stores in the datastore. Returns a dspinner loaded with the converted pins, -// and a count of the recursive and direct pins converted. -// -// After pins are stored in datastore, the root pin key is deleted to unlink -// the pin data in the DAGService. -func ConvertPinsFromIPLDToDS(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, internal ipld.DAGService) (ipfspinner.Pinner, int, error) { - const ipldPinPath = "/local/pins" - - dsPinner, err := dspinner.New(ctx, dstore, dserv) - if err != nil { - return nil, 0, err - } - - var convCount int - keyChan := make(chan cid.Cid) - - go func() { - err = ipldpinner.LoadKeys(ctx, dstore, dserv, internal, true, keyChan) - close(keyChan) - }() - for key := range keyChan { - dsPinner.PinWithMode(key, ipfspinner.Recursive) - convCount++ - } - if err != nil { - return nil, 0, fmt.Errorf("cannot load recursive keys: %s", err) - } - - keyChan = make(chan cid.Cid) - go func() { - err = ipldpinner.LoadKeys(ctx, dstore, dserv, internal, false, keyChan) - close(keyChan) - }() - for key := range keyChan { - dsPinner.PinWithMode(key, ipfspinner.Direct) - convCount++ - } - if err != nil { - return nil, 0, fmt.Errorf("cannot load direct keys: %s", err) - } - - err = dsPinner.Flush(ctx) - if err != nil { - return nil, 0, err - } - - // Delete root mdag key from datastore to remove old pin storage. - ipldPinDatastoreKey := ds.NewKey(ipldPinPath) - if err = dstore.Delete(ipldPinDatastoreKey); err != nil { - return nil, 0, fmt.Errorf("cannot delete old pin state: %v", err) - } - if err = dstore.Sync(ipldPinDatastoreKey); err != nil { - return nil, 0, fmt.Errorf("cannot sync old pin state: %v", err) - } - - return dsPinner, convCount, nil -} - -// ConvertPinsFromDSToIPLD converts the pins stored in the datastore by -// dspinner, into pins stored in the given internal DAGService by ipldpinner. -// Returns an ipldpinner loaded with the converted pins, and a count of the -// recursive and direct pins converted. -// -// After the pins are stored in the DAGService, the pins and their indexes are -// removed from the dspinner. -func ConvertPinsFromDSToIPLD(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, internal ipld.DAGService) (ipfspinner.Pinner, int, error) { - dsPinner, err := dspinner.New(ctx, dstore, dserv) - if err != nil { - return nil, 0, err - } - - ipldPinner, err := ipldpinner.New(dstore, dserv, internal) - if err != nil { - return nil, 0, err - } - - cids, err := dsPinner.RecursiveKeys(ctx) - if err != nil { - return nil, 0, err - } - for i := range cids { - ipldPinner.PinWithMode(cids[i], ipfspinner.Recursive) - dsPinner.RemovePinWithMode(cids[i], ipfspinner.Recursive) - } - convCount := len(cids) - - cids, err = dsPinner.DirectKeys(ctx) - if err != nil { - return nil, 0, err - } - for i := range cids { - ipldPinner.PinWithMode(cids[i], ipfspinner.Direct) - dsPinner.RemovePinWithMode(cids[i], ipfspinner.Direct) - } - convCount += len(cids) - - // Save the ipldpinner pins - err = ipldPinner.Flush(ctx) - if err != nil { - return nil, 0, err - } - - err = dsPinner.Flush(ctx) - if err != nil { - return nil, 0, err - } - - return ipldPinner, convCount, nil -} diff --git a/pinning/pinner/pinconv/pinconv_test.go b/pinning/pinner/pinconv/pinconv_test.go deleted file mode 100644 index 02abca61c..000000000 --- a/pinning/pinner/pinconv/pinconv_test.go +++ /dev/null @@ -1,179 +0,0 @@ -package pinconv - -import ( - "context" - "errors" - "io" - "strings" - "testing" - - bs "github.com/ipfs/go-blockservice" - cid "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - lds "github.com/ipfs/go-ds-leveldb" - blockstore "github.com/ipfs/go-ipfs-blockstore" - offline "github.com/ipfs/go-ipfs-exchange-offline" - ipfspin "github.com/ipfs/go-ipfs-pinner" - "github.com/ipfs/go-ipfs-pinner/dspinner" - util "github.com/ipfs/go-ipfs-util" - ipld "github.com/ipfs/go-ipld-format" - mdag "github.com/ipfs/go-merkledag" -) - -var rand = util.NewTimeSeededRand() - -type batchWrap struct { - ds.Datastore -} - -func randNode() (*mdag.ProtoNode, cid.Cid) { - nd := new(mdag.ProtoNode) - nd.SetData(make([]byte, 32)) - _, err := io.ReadFull(rand, nd.Data()) - if err != nil { - panic(err) - } - k := nd.Cid() - return nd, k -} - -func (d *batchWrap) Batch() (ds.Batch, error) { - return ds.NewBasicBatch(d), nil -} - -func makeStore() (ds.Datastore, ipld.DAGService) { - ldstore, err := lds.NewDatastore("", nil) - if err != nil { - panic(err) - } - var dstore ds.Batching - dstore = &batchWrap{ldstore} - - bstore := blockstore.NewBlockstore(dstore) - bserv := bs.New(bstore, offline.Exchange(bstore)) - dserv := mdag.NewDAGService(bserv) - return dstore, dserv -} - -func TestConversions(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - dstore, dserv := makeStore() - - dsPinner, err := dspinner.New(ctx, dstore, dserv) - if err != nil { - t.Fatal(err) - } - - a, ak := randNode() - err = dsPinner.Pin(ctx, a, false) - if err != nil { - t.Fatal(err) - } - - // create new node c, to be indirectly pinned through b - c, ck := randNode() - dserv.Add(ctx, c) - - // Create new node b, to be parent to a and c - b, _ := randNode() - b.AddNodeLink("child", a) - b.AddNodeLink("otherchild", c) - bk := b.Cid() // CID changed after adding links - - // recursively pin B{A,C} - err = dsPinner.Pin(ctx, b, true) - if err != nil { - t.Fatal(err) - } - - err = dsPinner.Flush(ctx) - if err != nil { - t.Fatal(err) - } - - verifyPins := func(pinner ipfspin.Pinner) error { - pinned, err := pinner.CheckIfPinned(ctx, ak, bk, ck) - if err != nil { - return err - } - if len(pinned) != 3 { - return errors.New("incorrect number of results") - } - for _, pn := range pinned { - switch pn.Key { - case ak: - if pn.Mode != ipfspin.Direct { - return errors.New("A pinned with wrong mode") - } - case bk: - if pn.Mode != ipfspin.Recursive { - return errors.New("B pinned with wrong mode") - } - case ck: - if pn.Mode != ipfspin.Indirect { - return errors.New("C should be pinned indirectly") - } - if pn.Via != bk { - return errors.New("C should be pinned via B") - } - } - } - return nil - } - - err = verifyPins(dsPinner) - if err != nil { - t.Fatal(err) - } - - ipldPinner, toIPLDCount, err := ConvertPinsFromDSToIPLD(ctx, dstore, dserv, dserv) - if err != nil { - t.Fatal(err) - } - if toIPLDCount != 2 { - t.Fatal("expected 2 ds-to-ipld pins, got", toIPLDCount) - } - - err = verifyPins(ipldPinner) - if err != nil { - t.Fatal(err) - } - - toDSPinner, toDSCount, err := ConvertPinsFromIPLDToDS(ctx, dstore, dserv, dserv) - if err != nil { - t.Fatal(err) - } - if toDSCount != toIPLDCount { - t.Fatal("ds-to-ipld pins", toIPLDCount, "not equal to ipld-to-ds-pins", toDSCount) - } - - err = verifyPins(toDSPinner) - if err != nil { - t.Fatal(err) - } -} - -func TestConvertLoadError(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dstore, dserv := makeStore() - // Point /local/pins to empty node to cause failure loading pins. - pinDatastoreKey := ds.NewKey("/local/pins") - emptyKey, err := cid.Decode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n") - if err != nil { - panic(err) - } - if err = dstore.Put(pinDatastoreKey, emptyKey.Bytes()); err != nil { - panic(err) - } - if err = dstore.Sync(pinDatastoreKey); err != nil { - panic(err) - } - - _, _, err = ConvertPinsFromIPLDToDS(ctx, dstore, dserv, dserv) - if err == nil || !strings.HasPrefix(err.Error(), "cannot load recursive keys") { - t.Fatal("did not get expected error") - } -}