diff --git a/package.json b/package.json index 3194d7668f..40b047cc0e 100644 --- a/package.json +++ b/package.json @@ -61,6 +61,7 @@ "bs58": "^3.0.0", "debug": "^2.2.0", "detect-node": "^2.0.3", + "fnv": "^0.1.3", "fs-blob-store": "^5.2.1", "glob": "^7.0.3", "hapi": "^13.4.1", @@ -123,4 +124,4 @@ "kumavis ", "nginnever " ] -} \ No newline at end of file +} diff --git a/src/core/index.js b/src/core/index.js index e6d1d3be32..bc7af321d4 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -17,6 +17,7 @@ const repo = require('./ipfs/repo') const init = require('./ipfs/init') const bootstrap = require('./ipfs/bootstrap') const config = require('./ipfs/config') +const pinner = require('./ipfs/pinner') const block = require('./ipfs/block') const object = require('./ipfs/object') const libp2p = require('./ipfs/libp2p') @@ -53,6 +54,7 @@ function IPFS (repoInstance) { this.init = init(this) this.bootstrap = bootstrap(this) this.config = config(this) + this.pinner = pinner(this) this.block = block(this) this.object = object(this) this.libp2p = libp2p(this) diff --git a/src/core/ipfs/pinner-utils.js b/src/core/ipfs/pinner-utils.js new file mode 100644 index 0000000000..6050514c42 --- /dev/null +++ b/src/core/ipfs/pinner-utils.js @@ -0,0 +1,270 @@ +'use strict' + +const bs58 = require('bs58') +const protobuf = require('protocol-buffers') +const crypto = require('crypto') +const fnv = require('fnv') +const mDAG = require('ipfs-merkle-dag') +const DAGNode = mDAG.DAGNode +const DAGLink = mDAG.DAGLink +const varint = require('varint') + +const emptyKeyHash = 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n' +const emptyKey = new Buffer(bs58.decode(emptyKeyHash)) +const defaultFanout = 256 +const maxItems = 8192 + +// Protobuf interface +const pbSchema = ( + // from go-ipfs/pin/internal/pb/header.proto + 'message Set { ' + + // 1 for now + 'optional uint32 version = 1; ' + + // how many of the links are subtrees + 'optional uint32 fanout = 2; ' + + // hash seed for subtree selection, a random number + 'optional fixed32 seed = 3; ' + + '}' +) +const pb = protobuf(pbSchema) +function readHeader (rootNode) { + // rootNode.data should be a buffer of the format: + // < varint(headerLength) | header | itemData... > + var rootData = rootNode.data + var hdrLength = varint.decode(rootData) + var vBytes = varint.decode.bytes + if (vBytes <= 0) { + return { err: 'Invalid Set header length' } + } + if (vBytes + hdrLength > rootData.length) { + return { err: 'Impossibly large set header length' } + } + var hdrSlice = rootData.slice(vBytes, hdrLength + vBytes) + var header = pb.Set.decode(hdrSlice) + if (header.version !== 1) { + return { err: 'Unsupported Set version: ' + header.version } + } + if (header.fanout > rootNode.links.length) { + return { err: 'Impossibly large fanout' } + } + return { + header: header, + data: rootData.slice(hdrLength + vBytes) + } +} + +exports = module.exports = function (dagS) { + var pinnerUtils = { + // should this be part of `object` rather than `pinner`? + hasChild: (root, childhash, callback, _links, _checked, _seen) => { + // callback (err, has) + if (callback.fired) { return } + if (typeof childhash === 'object') { + childhash = bs58.encode(childhash).toString() + } + _links = _links || root.links.length + _checked = _checked || 0 + _seen = _seen || {} + + if (!root.links.length && _links === _checked) { + // all nodes have been checked + return callback(null, false) + } + root.links.forEach((link) => { + var bs58link = bs58.encode(link.hash).toString() + if (bs58link === childhash) { + callback.fired = true + return callback(null, true) + } + dagS.get(link.hash, (err, obj) => { + if (err) { + callback.fired = true + return callback(err) + } + // don't check the same links twice + if (bs58link in _seen) { return } + _seen[bs58link] = true + + _checked++ + _links += obj.links.length + pinnerUtils.hasChild(obj, childhash, callback, _links, _checked, _seen) + }) + }) + }, + + storeSet: (keys, logInternalKey, callback) => { + // callback (err, rootNode) + var items = keys.map((key) => { + return { + key: key, + data: null + } + }) + pinnerUtils.storeItems(items, logInternalKey, (err, rootNode) => { + if (err) { return callback(err) } + dagS.add(rootNode, (err) => { + if (err) { return callback(err) } + logInternalKey(rootNode.multihash()) + callback(null, rootNode) + }) + }) + }, + + storeItems: (items, logInternalKey, callback, _subcalls, _done) => { + // callback (err, rootNode) + var seed = crypto.randomBytes(4).readUInt32LE(0, true) + var pbHeader = pb.Set.encode({ + version: 1, + fanout: defaultFanout, + seed: seed + }) + var rootData = Buffer.concat([ + new Buffer(varint.encode(pbHeader.length)), pbHeader + ]) + var rootLinks = [] + var i + for (i = 0; i < defaultFanout; i++) { + rootLinks.push(new DAGLink('', null, emptyKey)) + } + logInternalKey(emptyKey) + + if (items.length <= maxItems) { + // the items will fit in a single root node + var itemLinks = [] + var itemData = [] + var indices = [] + for (i = 0; i < items.length; i++) { + itemLinks.push(new DAGLink('', null, items[i].key)) + itemData.push(items[i].data || new Buffer([])) + indices.push(i) + } + indices.sort((a, b) => { + var x = Buffer.compare(itemLinks[a].hash, itemLinks[b].hash) + if (x) { return x } + return (a < b ? -1 : 1) + }) + var sortedLinks = indices.map((i) => { return itemLinks[i] }) + var sortedData = indices.map((i) => { return itemData[i] }) + rootLinks = rootLinks.concat(sortedLinks) + rootData = Buffer.concat([rootData].concat(sortedData)) + readHeader(new DAGNode(rootData, rootLinks)) // + return callback(null, new DAGNode(rootData, rootLinks)) + } else { + // need to split up the items into multiple root nodes + // (using go-ipfs "wasteful but simple" approach for consistency) + _subcalls = _subcalls || 0 + _done = _done || 0 + var h + var hashed = {} + var hashFn = (seed, key) => { + var buf = new Buffer(4) + var h = new fnv.FNV() + buf.writeUInt32LE(seed, 0) + h.update(buf) + h.update(bs58.encode(key).toString()) + return h.value() + } + // items will be distributed among `defaultFanout` bins + for (i = 0; i < items.length; i++) { + h = hashFn(seed, items[i].key) % defaultFanout + hashed[h] = hashed[h] || [] + hashed[h].push(items[i]) + } + var storeItemsCb = (err, child) => { + if (callback.fired) { return } + if (err) { + callback.fired = true + return callback(err) + } + dagS.add(child, (err) => { + if (callback.fired) { return } + if (err) { + callback.fired = true + return callback(err) + } + logInternalKey(child.multihash()) + rootLinks[this.h] = new DAGLink( + '', child.size(), child.multihash() + ) + _done++ + if (_done === _subcalls) { + // all finished + return callback(null, new DAGNode(rootData, rootLinks)) + } + }) + } + _subcalls += Object.keys(hashed).length + for (h in hashed) { + if (hashed.hasOwnProperty(h)) { + pinnerUtils.storeItems( + hashed[h], + logInternalKey, + storeItemsCb.bind({h: h}), + _subcalls, + _done + ) + } + } + } + }, + + loadSet: (rootNode, name, logInternalKey, callback) => { + // callback (err, keys) + var link = rootNode.links.filter((link) => { + return link.name === name + }).pop() + if (!link) { return callback('No link found with name ' + name) } + logInternalKey(link.hash) + dagS.get(link.hash, (err, obj) => { + if (err) { return callback(err) } + var keys = [] + var walkerFn = (link) => { + keys.push(link.hash) + } + pinnerUtils.walkItems(obj, walkerFn, logInternalKey, (err) => { + if (err) { return callback(err) } + return callback(null, keys) + }) + }) + }, + + walkItems: (node, walkerFn, logInternalKey, callback) => { + // callback (err) + var h = readHeader(node) + if (h.err) { return callback(h.err) } + var fanout = h.header.fanout + var subwalks = 0 + var finished = 0 + + var walkCb = (err) => { + if (err) { return callback(err) } + finished++ + if (subwalks === finished) { + return callback() + } + } + + for (var i = 0; i < node.links.length; i++) { + var link = node.links[i] + if (i >= fanout) { + // item link + walkerFn(link, i, h.data) + } else { + // fanout link + logInternalKey(link.hash) + if (!emptyKey.equals(link.hash)) { + subwalks++ + dagS.get(link.hash, (err, obj) => { + if (err) { return callback(err) } + pinnerUtils.walkItems(obj, walkerFn, logInternalKey, walkCb) + }) + } + } + } + if (!subwalks) { + return callback() + } + } + } + return pinnerUtils +} diff --git a/src/core/ipfs/pinner.js b/src/core/ipfs/pinner.js new file mode 100644 index 0000000000..acc286646a --- /dev/null +++ b/src/core/ipfs/pinner.js @@ -0,0 +1,299 @@ +'use strict' + +const bs58 = require('bs58') +const mDAG = require('ipfs-merkle-dag') +const DAGNode = mDAG.DAGNode +const pinnerUtils = require('./pinner-utils') + +function KeySet (keys) { + // Buffers with identical data are still different objects, so + // they need to be cast to strings to prevent duplicates in Sets + this.keys = new Set() + this.keyStrings = new Set() + this.add = (key) => { + if (!this.has(key)) { + var keyString = bs58.encode(key).toString() + this.keyStrings.add(keyString) + this.keys.add(key) + } + } + this.delete = (key) => { + var keyString = bs58.encode(key).toString() + this.keyStrings.delete(keyString) + this.keys.delete(key) + } + this.clear = () => { + this.keys.clear() + this.keyStrings.clear() + } + this.has = (key) => { + return this.keyStrings.has(bs58.encode(key).toString()) + } + this.toArray = () => { + return Array.from(this.keys) + } + this.toStringArray = () => { + return Array.from(this.keyStrings) + } + keys = keys || [] + keys.forEach(this.add) +} + +module.exports = function (self) { + var directPins = new KeySet() + var recursivePins = new KeySet() + var internalPins = new KeySet() + + // repo.datastore makes a subfolder using first 8 chars of key, so + // pin data will be saved under /blocks/internal/internal_pins.data + const pinDataStoreKey = 'internal_pins' + + var repo = self._repo + var dagS = self._dagS + + var pinner = { + types: { + direct: 'direct', + recursive: 'recursive', + indirect: 'indirect', + internal: 'internal', + all: 'all' + }, + + clear: () => { + directPins.clear() + recursivePins.clear() + internalPins.clear() + }, + + utils: pinnerUtils(dagS), + + pin: (obj, recursive, callback) => { + // callback (err) + if (typeof recursive === 'function') { + callback = recursive + recursive = true + } + const multihash = obj.multihash() + if (recursive) { + if (recursivePins.has(multihash)) { + return callback(null) + } + directPins.delete(multihash) + dagS.getRecursive(multihash, (err, objs) => { + if (err) { + return callback(err) + } + recursivePins.add(multihash) + return callback(null) + }) + } else { + dagS.get(multihash, (err, obj) => { + if (err) { + return callback(err) + } + if (recursivePins.has(multihash)) { + return callback(bs58.encode(multihash).toString() + + ' already pinned recursively') + } + directPins.add(multihash) + return callback(null) + }) + } + }, + + unpin: (multihash, recursive, callback) => { + // callback (err) + if (typeof recursive === 'function') { + callback = recursive + recursive = true + } + pinner.isPinnedWithType(multihash, pinner.types.all, (err, pinned, reason) => { + if (err) { return callback(err) } + if (!pinned) { return callback('not pinned') } + switch (reason) { + case (pinner.types.recursive): + if (recursive) { + recursivePins.delete(multihash) + return callback(null) + } + return callback(bs58.encode(multihash).toString() + + ' is pinned recursively') + case (pinner.types.direct): + directPins.delete(multihash) + return callback(null) + default: + return callback(bs58.encode(multihash).toString() + + ' is pinned indirectly under ' + reason) + } + }) + }, + + isPinned: (multihash, callback) => { + // callback (err, pinned, reason) + pinner.isPinnedWithType(multihash, pinner.types.all, callback) + }, + + isPinnedWithType: (multihash, pinType, callback) => { + // callback (err, pinned, reason) + + // recursive + if ((pinType === pinner.types.recursive || pinType === pinner.types.all) && + recursivePins.has(multihash)) { + return callback(null, true, pinner.types.recursive) + } + if ((pinType === pinner.types.recursive)) { + return callback(null, false) + } + // direct + if ((pinType === pinner.types.direct || pinType === pinner.types.all) && + directPins.has(multihash)) { + return callback(null, true, pinner.types.direct) + } + if ((pinType === pinner.types.direct)) { + return callback(null, false) + } + if ((pinType === pinner.types.internal || pinType === pinner.types.all) && + internalPins.has(multihash)) { + return callback(null, true, pinner.types.internal) + } + if ((pinType === pinner.types.internal)) { + return callback(null, false) + } + + // indirect (default) + var cbs = [callback] + var checkedCount = 0 + var done = () => { + // flag pending callbacks to break early when result found + cbs.forEach((cb) => { + cb.fired = true + }) + } + var rkeys = pinner.recursiveKeys() + if (!rkeys.length) { + return callback(null, false) + } + rkeys.forEach((rkey) => { + dagS.get(rkey, (err, obj) => { + if (callback.fired) { return } + if (err) { + done() + return callback(err) + } + var thisCb = (err, has) => { + if (err) { + done() + return callback(err) + } + if (has) { + done() + return callback( + null, true, bs58.encode(obj.multihash()).toString() + ) + } else { + checkedCount++ + if (checkedCount === rkeys.length) { + done() + return callback(null, false) + } + } + } + cbs.push(thisCb) + pinner.utils.hasChild(obj, multihash, thisCb) + }) + }) + }, + + directKeys: () => { + return directPins.toArray() + }, + + directKeyStrings: () => { + return directPins.toStringArray() + }, + + recursiveKeys: () => { + return recursivePins.toArray() + }, + + recursiveKeyStrings: () => { + return recursivePins.toStringArray() + }, + + internalKeys: () => { + return internalPins.toArray() + }, + + internalKeyStrings: () => { + return internalPins.toStringArray() + }, + + // encodes and writes pinner key sets to the datastore + flush: (callback) => { + // callback (err, root) + var newInternalPins = new KeySet() + var logInternalKey = (multihash) => { + newInternalPins.add(multihash) + } + // each key set will be stored as a DAG node, and root will link to both + var root = new DAGNode() + pinner.utils.storeSet(pinner.directKeys(), logInternalKey, (err, dRoot) => { + if (err) { return callback(err) } + root.addNodeLink(pinner.types.direct, dRoot) + pinner.utils.storeSet(pinner.recursiveKeys(), logInternalKey, (err, rRoot) => { + if (err) { return callback(err) } + root.addNodeLink(pinner.types.recursive, rRoot) + // the set nodes link to an empty node, so make sure it's added + dagS.add(new DAGNode(), (err) => { + if (err) { return callback(err) } + // then add the root node to dagS + dagS.add(root, (err) => { + if (err) { return callback(err) } + // update pinner's internal pin set + logInternalKey(root.multihash()) + internalPins = newInternalPins + // save a reference to root hash under a consistent key + var pseudoblock = { + data: root.marshal(), + key: pinDataStoreKey, + extension: null + } + repo.datastore.put(pseudoblock, (err, metadata) => { + if (err) { return callback(err) } + return callback(null, root) + }) + }) + }) + }) + }) + }, + + load: (callback) => { + repo.datastore.get(pinDataStoreKey, (err, pseudoblock) => { + if (err) { return callback(err) } + var rootBytes = pseudoblock.data + var root = (new DAGNode()).unMarshal(rootBytes) + var newInternalPins = new KeySet([root.multihash()]) + var logInternalKey = (multihash) => { + newInternalPins.add(multihash) + } + pinner.utils.loadSet( + root, pinner.types.recursive, logInternalKey, (err, keys) => { + if (err) { return callback(err) } + recursivePins = new KeySet(keys) + pinner.utils.loadSet( + root, pinner.types.direct, logInternalKey, (err, keys) => { + if (err) { return callback(err) } + directPins = new KeySet(keys) + internalPins = newInternalPins + return callback() + } + ) + } + ) + }) + } + } + return pinner +} diff --git a/test/core/both/test-pin.js b/test/core/both/test-pin.js new file mode 100644 index 0000000000..9c5fb79776 --- /dev/null +++ b/test/core/both/test-pin.js @@ -0,0 +1,267 @@ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +const IPFS = require('../../../src/core') +const bs58 = require('bs58') +const mDAG = require('ipfs-merkle-dag') +const DAGNode = mDAG.DAGNode +const createTempRepo = require('../../utils/temp-repo') + +const emptyKeyHash = 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n' + +describe('pinner', function () { + this.timeout(20000) + var ipfs + var repo + var pinner + var Obj = {} + + function encode (key) { + return bs58.encode(key).toString() + } + + before((done) => { + repo = createTempRepo() + ipfs = new IPFS(repo) + pinner = ipfs.pinner + ipfs.init({ emptyRepo: true }, (err) => { + expect(err).to.not.exist + ipfs.load(() => { + // Use this tree for multiple tests + // + // B E + // / \ + // A C + // \ + // D + + var labels = ['A', 'B', 'C', 'D', 'E'] + labels.forEach((label) => { + Obj[label] = new DAGNode(new Buffer('Node ' + label)) + }) + // make links from bottom up to avoid mutating the hash after linking + Obj.C.addNodeLink('Child D', Obj.D) + Obj.B.addNodeLink('Child A', Obj.A) + Obj.B.addNodeLink('Child C', Obj.C) + var count = 0 + labels.forEach((label) => { + ipfs.object.put(Obj[label], (err) => { + expect(err).to.not.exist + count++ + if (count === labels.length) { + done() + } + }) + }) + }) + }) + }) + + beforeEach((done) => { + pinner.clear() + done() + }) + + describe('pin', () => { + it('pins object directly', (done) => { + pinner.pin(Obj.A, false, (err) => { + expect(err).to.not.exist + pinner.isPinned(Obj.A.multihash(), (err, pinned, reason) => { + expect(err).to.not.exist + expect(pinned).to.be.true + expect(reason).to.equal(pinner.types.direct) + done() + }) + }) + }) + + it('pins recursively by default', (done) => { + // direct pin A which is child of B + pinner.pin(Obj.A, false, (err) => { + expect(err).to.not.exist + // recursive pin B which has children A and C + pinner.pin(Obj.B, (err) => { + expect(err).to.not.exist + // B should be 'recursive' pin + pinner.isPinned(Obj.B.multihash(), (err, pinned, reason) => { + expect(err).to.not.exist + expect(pinned).to.be.true + expect(reason).to.equal(pinner.types.recursive) + // A should still be 'direct' pin + pinner.isPinned(Obj.A.multihash(), (err, pinned, reason) => { + expect(err).to.not.exist + expect(pinned).to.be.true + expect(reason).to.equal(pinner.types.direct) + // C should be 'indirect' pin + pinner.isPinned(Obj.C.multihash(), (err, pinned, reason) => { + expect(err).to.not.exist + expect(pinned).to.be.true + // indirect pin 'reason' is the b58 hash of recursive root pin + expect(reason).to.equal(encode(Obj.B.multihash())) + done() + }) + }) + }) + }) + }) + }) + + it('rejects direct pin if already recursively pinned', (done) => { + // recursive pin B which has children A and C + pinner.pin(Obj.B, (err) => { + expect(err).to.not.exist + // direct pin B should fail + pinner.pin(Obj.B, false, (err) => { + expect(err).to.equal(encode(Obj.B.multihash()) + + ' already pinned recursively') + // pinning recursively again should succeed + pinner.pin(Obj.B, (err) => { + expect(err).to.not.exist + done() + }) + }) + }) + }) + + it('rejects recursive pin if child object is not stored', (done) => { + Obj.Y = new DAGNode(new Buffer('Node Y')) + Obj.Z = new DAGNode(new Buffer('Node Z')) + Obj.Y.addNodeLink('Child Z', Obj.Z) + ipfs.object.put(Obj.Y, (err) => { + expect(err).to.not.exist + // this should fail because Z is not stored + pinner.pin(Obj.Y, (err) => { + expect(err).to.exist + ipfs.object.put(Obj.Z, (err) => { + expect(err).to.not.exist + // now it should succeed + pinner.pin(Obj.Y, (err) => { + expect(err).to.not.exist + done() + }) + }) + }) + }) + }) + }) + + describe('unpin', () => { + it('unpins directly pinned object', (done) => { + pinner.pin(Obj.A, false, (err) => { + expect(err).to.not.exist + pinner.unpin(Obj.A.multihash(), false, (err) => { + expect(err).to.not.exist + pinner.isPinned(Obj.A.multihash(), (err, pinned) => { + expect(err).to.not.exist + expect(pinned).to.be.false + done() + }) + }) + }) + }) + + it('unpins recursively by default', (done) => { + const bs58A = encode(Obj.A.multihash()) + const bs58B = encode(Obj.B.multihash()) + // recursive pin B which has children A and C + pinner.pin(Obj.B, (err) => { + expect(err).to.not.exist + // indirect pin A should not be unpinnable while B is pinned + pinner.unpin(Obj.A.multihash(), (err) => { + expect(err).to.equal( + bs58A + ' is pinned indirectly under ' + bs58B + ) + // unpinning B should also unpin A + pinner.unpin(Obj.B.multihash(), (err) => { + expect(err).to.not.exist + pinner.isPinned(Obj.B.multihash(), (err, pinned) => { + expect(err).to.not.exist + expect(pinned).to.be.false + pinner.isPinned(Obj.A.multihash(), (err, pinned) => { + expect(err).to.not.exist + expect(pinned).to.be.false + done() + }) + }) + }) + }) + }) + }) + }) + + describe('flush and load (roundtrip)', () => { + it('writes pinned keys to datastore and reads them back', (done) => { + var checkInternal = (root) => { + var internal = pinner.internalKeyStrings() + expect(internal.length).to.equal(4) + internal = new Set(internal) + expect(internal.has(emptyKeyHash)).to.be.true + expect(internal.has(encode(root.multihash()))).to.be.true + expect(root.links.length).to.equal(2) + root.links.forEach((link) => { + expect(internal.has(encode(link.hash))).to.be.true + }) + } + var checkClear = () => { + expect(pinner.directKeys().length).to.equal(0) + expect(pinner.recursiveKeys().length).to.equal(0) + expect(pinner.internalKeys().length).to.equal(0) + } + checkClear() + // recursive pin + pinner.pin(Obj.B, (err) => { + expect(err).to.not.exist + // direct pin + pinner.pin(Obj.E, false, (err) => { + expect(err).to.not.exist + // save to datastore + pinner.flush((err, root) => { + // internalPins should have a recursive root node, a direct root + // node, a root header node with links to both, and an empty node + expect(err).to.not.exist + checkInternal(root) + // clear from memory + pinner.clear() + checkClear() + // load from datastore + pinner.load((err) => { + expect(err).to.not.exist + // Obj.E should be restored as a direct pin + var direct = pinner.directKeyStrings() + expect(direct.length).to.equal(1) + expect(direct[0]).to.equal(encode(Obj.E.multihash())) + // Obj.B should be restored as a recursive pin + var recursive = pinner.recursiveKeyStrings() + expect(recursive.length).to.equal(1) + expect(recursive[0]).to.equal(encode(Obj.B.multihash())) + // Internal should be the same as before + checkInternal(root) + done() + }) + }) + }) + }) + }) + }) + + describe('utils', () => { + describe('hasChild', () => { + it('finds if child hash is somewhere in object tree', (done) => { + pinner.utils.hasChild(Obj.B, Obj.D.multihash(), (err, has) => { + expect(err).to.not.exist + expect(has).to.be.true + pinner.utils.hasChild(Obj.B, Obj.E.multihash(), (err, has) => { + expect(err).to.not.exist + expect(has).to.be.false + done() + }) + }) + }) + }) + }) + + after((done) => { + repo.teardown(done) + }) +})