From 44045b02eeb09681d075979fb10f8207c842056d Mon Sep 17 00:00:00 2001 From: dirkmc Date: Mon, 26 Aug 2019 13:10:55 -0400 Subject: [PATCH] feat: garbage collection (#2022) resolves https://github.com/ipfs/js-ipfs/issues/2012 Depends on - [x] https://github.com/ipfs/js-ipfs/pull/2004 - [x] https://github.com/ipfs/js-ipfs-http-client/pull/992 - [x] https://github.com/ipfs/interface-js-ipfs-core/pull/462 - [x] https://github.com/achingbrain/mortice/pull/1 TODO: - [x] Core (mark and sweep) - [x] CLI - [x] http interface - [x] interface-js-ipfs-core tests https://github.com/ipfs/interface-js-ipfs-core/pull/462 - [x] nodejs-specific tests - [x] Locking - [x] Tests for locking --- package.json | 7 +- src/cli/commands/repo/gc.js | 29 +- src/core/components/block.js | 25 +- .../files-regular/add-pull-stream.js | 8 +- src/core/components/object.js | 28 +- src/core/components/pin.js | 488 +++++------------- src/core/components/pin/gc-lock.js | 80 +++ src/core/components/pin/gc.js | 158 ++++++ src/core/components/pin/pin-manager.js | 322 ++++++++++++ src/core/components/{ => pin}/pin-set.js | 29 +- src/core/components/{ => pin}/pin.proto.js | 0 src/core/components/repo.js | 9 +- src/core/index.js | 7 + src/http/api/resources/repo.js | 27 +- src/http/api/routes/repo.js | 8 + src/utils/mutex.js | 61 +++ test/cli/gc.js | 66 +++ test/cli/repo.js | 1 - test/core/gc-lock.spec.js | 301 +++++++++++ test/core/gc.spec.js | 279 ++++++++++ test/core/interface.spec.js | 10 +- test/core/pin-set.js | 55 +- test/http-api/interface.js | 10 +- test/sharness/t0030-gc.sh | 26 + 24 files changed, 1599 insertions(+), 435 deletions(-) create mode 100644 src/core/components/pin/gc-lock.js create mode 100644 src/core/components/pin/gc.js create mode 100644 src/core/components/pin/pin-manager.js rename src/core/components/{ => pin}/pin-set.js (89%) rename src/core/components/{ => pin}/pin.proto.js (100%) create mode 100644 src/utils/mutex.js create mode 100644 test/cli/gc.js create mode 100644 test/core/gc-lock.spec.js create mode 100644 test/core/gc.spec.js create mode 100755 test/sharness/t0030-gc.sh diff --git a/package.json b/package.json index 0459207866..16e9307b7d 100644 --- a/package.json +++ b/package.json @@ -65,7 +65,7 @@ "array-shuffle": "^1.0.1", "async": "^2.6.1", "async-iterator-all": "^1.0.0", - "async-iterator-to-pull-stream": "^1.1.0", + "async-iterator-to-pull-stream": "^1.3.0", "async-iterator-to-stream": "^1.1.0", "base32.js": "~0.1.0", "bignumber.js": "^9.0.0", @@ -83,6 +83,7 @@ "debug": "^4.1.0", "dlv": "^1.1.3", "err-code": "^2.0.0", + "explain-error": "^1.0.4", "file-type": "^12.0.1", "fnv1a": "^1.0.1", "fsm-event": "^2.1.0", @@ -95,7 +96,7 @@ "ipfs-bitswap": "~0.25.1", "ipfs-block": "~0.8.1", "ipfs-block-service": "~0.15.2", - "ipfs-http-client": "^33.1.0", + "ipfs-http-client": "^33.1.1", "ipfs-http-response": "~0.3.1", "ipfs-mfs": "~0.12.0", "ipfs-multipart": "~0.1.1", @@ -139,6 +140,7 @@ "merge-options": "^1.0.1", "mime-types": "^2.1.21", "mkdirp": "~0.5.1", + "mortice": "^1.2.2", "multiaddr": "^6.1.0", "multiaddr-to-uri": "^5.0.0", "multibase": "~0.6.0", @@ -193,6 +195,7 @@ "ipfsd-ctl": "^0.43.0", "libp2p-websocket-star": "~0.10.2", "ncp": "^2.0.0", + "p-event": "^4.1.0", "qs": "^6.5.2", "rimraf": "^3.0.0", "sinon": "^7.4.0", diff --git a/src/cli/commands/repo/gc.js b/src/cli/commands/repo/gc.js index ec3b547e93..b805ac9934 100644 --- a/src/cli/commands/repo/gc.js +++ b/src/cli/commands/repo/gc.js @@ -5,12 +5,31 @@ module.exports = { describe: 'Perform a garbage collection sweep on the repo.', - builder: {}, + builder: { + quiet: { + alias: 'q', + desc: 'Write minimal output', + type: 'boolean', + default: false + }, + 'stream-errors': { + desc: 'Output individual errors thrown when deleting blocks.', + type: 'boolean', + default: true + } + }, - handler (argv) { - argv.resolve((async () => { - const ipfs = await argv.getIpfs() - await ipfs.repo.gc() + handler ({ getIpfs, print, quiet, streamErrors, resolve }) { + resolve((async () => { + const ipfs = await getIpfs() + const res = await ipfs.repo.gc() + for (const r of res) { + if (r.err) { + streamErrors && print(r.err.message, true, true) + } else { + print((quiet ? '' : 'removed ') + r.cid) + } + } })()) } } diff --git a/src/core/components/block.js b/src/core/components/block.js index 6a1bb960d1..305ad2df3d 100644 --- a/src/core/components/block.js +++ b/src/core/components/block.js @@ -81,17 +81,19 @@ module.exports = function block (self) { cb(null, new Block(block, cid)) }) }, - (block, cb) => self._blockService.put(block, (err) => { - if (err) { - return cb(err) - } + (block, cb) => self._gcLock.readLock((_cb) => { + self._blockService.put(block, (err) => { + if (err) { + return _cb(err) + } - if (options.preload !== false) { - self._preload(block.cid) - } + if (options.preload !== false) { + self._preload(block.cid) + } - cb(null, block) - }) + _cb(null, block) + }) + }, cb) ], callback) }), rm: promisify((cid, callback) => { @@ -100,7 +102,10 @@ module.exports = function block (self) { } catch (err) { return setImmediate(() => callback(errCode(err, 'ERR_INVALID_CID'))) } - self._blockService.delete(cid, callback) + + // We need to take a write lock here to ensure that adding and removing + // blocks are exclusive operations + self._gcLock.writeLock((cb) => self._blockService.delete(cid, cb), callback) }), stat: promisify((cid, options, callback) => { if (typeof options === 'function') { diff --git a/src/core/components/files-regular/add-pull-stream.js b/src/core/components/files-regular/add-pull-stream.js index 8f46c8913f..ee815f1487 100644 --- a/src/core/components/files-regular/add-pull-stream.js +++ b/src/core/components/files-regular/add-pull-stream.js @@ -116,7 +116,9 @@ function pinFile (file, self, opts, cb) { const isRootDir = !file.path.includes('/') const shouldPin = pin && isRootDir && !opts.onlyHash && !opts.hashAlg if (shouldPin) { - return self.pin.add(file.hash, { preload: false }, err => cb(err, file)) + // Note: addPullStream() has already taken a GC lock, so tell + // pin.add() not to take a (second) GC lock + return self.pin.add(file.hash, { preload: false, lock: false }, err => cb(err, file)) } else { cb(null, file) } @@ -156,7 +158,7 @@ module.exports = function (self) { } opts.progress = progress - return pull( + return self._gcLock.pullReadLock(() => pull( pullMap(content => normalizeContent(content, opts)), pullFlatten(), pullMap(file => ({ @@ -167,6 +169,6 @@ module.exports = function (self) { pullAsyncMap((file, cb) => prepareFile(file, self, opts, cb)), pullMap(file => preloadFile(file, self, opts)), pullAsyncMap((file, cb) => pinFile(file, self, opts, cb)) - ) + )) } } diff --git a/src/core/components/object.js b/src/core/components/object.js index 87f03075aa..b7e9fa1e83 100644 --- a/src/core/components/object.js +++ b/src/core/components/object.js @@ -242,19 +242,21 @@ module.exports = function object (self) { } function next () { - self._ipld.put(node, multicodec.DAG_PB, { - cidVersion: 0, - hashAlg: multicodec.SHA2_256 - }).then( - (cid) => { - if (options.preload !== false) { - self._preload(cid) - } - - callback(null, cid) - }, - (error) => callback(error) - ) + self._gcLock.readLock((cb) => { + self._ipld.put(node, multicodec.DAG_PB, { + cidVersion: 0, + hashAlg: multicodec.SHA2_256 + }).then( + (cid) => { + if (options.preload !== false) { + self._preload(cid) + } + + cb(null, cid) + }, + cb + ) + }, callback) } }), diff --git a/src/core/components/pin.js b/src/core/components/pin.js index a41cba72d4..025fdb75ed 100644 --- a/src/core/components/pin.js +++ b/src/core/components/pin.js @@ -2,192 +2,25 @@ 'use strict' const promisify = require('promisify-es6') -const { DAGNode, DAGLink } = require('ipld-dag-pb') const CID = require('cids') const map = require('async/map') const mapSeries = require('async/mapSeries') -const series = require('async/series') -const parallel = require('async/parallel') -const eachLimit = require('async/eachLimit') const waterfall = require('async/waterfall') -const detectLimit = require('async/detectLimit') const setImmediate = require('async/setImmediate') -const queue = require('async/queue') -const { Key } = require('interface-datastore') const errCode = require('err-code') const multibase = require('multibase') -const multicodec = require('multicodec') -const createPinSet = require('./pin-set') const { resolvePath } = require('../utils') - -// arbitrary limit to the number of concurrent dag operations -const concurrencyLimit = 300 -const pinDataStoreKey = new Key('/local/pins') +const PinManager = require('./pin/pin-manager') +const PinTypes = PinManager.PinTypes function toB58String (hash) { return new CID(hash).toBaseEncodedString() } -function invalidPinTypeErr (type) { - const errMsg = `Invalid type '${type}', must be one of {direct, indirect, recursive, all}` - return errCode(new Error(errMsg), 'ERR_INVALID_PIN_TYPE') -} - module.exports = (self) => { - const repo = self._repo const dag = self.dag - const pinset = createPinSet(dag) - const types = { - direct: 'direct', - recursive: 'recursive', - indirect: 'indirect', - all: 'all' - } - - let directPins = new Set() - let recursivePins = new Set() - - const directKeys = () => - Array.from(directPins).map(key => new CID(key).buffer) - const recursiveKeys = () => - Array.from(recursivePins).map(key => new CID(key).buffer) - - function walkDag ({ cid, preload = false, onCid = () => {} }, cb) { - const q = queue(function ({ cid }, done) { - dag.get(cid, { preload }, function (err, result) { - if (err) { - return done(err) - } - - onCid(cid) - - if (result.value.Links) { - q.push(result.value.Links.map(link => ({ - cid: link.Hash - }))) - } - - done() - }) - }, concurrencyLimit) - q.drain = () => { - cb() - } - q.error = (err) => { - q.kill() - cb(err) - } - q.push({ cid }) - } - - function getIndirectKeys ({ preload }, callback) { - const indirectKeys = new Set() - eachLimit(recursiveKeys(), concurrencyLimit, (multihash, cb) => { - // load every hash in the graph - walkDag({ - cid: new CID(multihash), - preload: preload || false, - onCid: (cid) => { - cid = cid.toString() - - // recursive pins pre-empt indirect pins - if (!recursivePins.has(cid)) { - indirectKeys.add(cid) - } - } - }, cb) - }, (err) => { - if (err) { return callback(err) } - callback(null, Array.from(indirectKeys)) - }) - } - - // Encode and write pin key sets to the datastore: - // a DAGLink for each of the recursive and direct pinsets - // a DAGNode holding those as DAGLinks, a kind of root pin - function flushPins (callback) { - let dLink, rLink, root - series([ - // create a DAGLink to the node with direct pins - cb => waterfall([ - cb => pinset.storeSet(directKeys(), cb), - ({ node, cid }, cb) => { - try { - cb(null, new DAGLink(types.direct, node.size, cid)) - } catch (err) { - cb(err) - } - }, - (link, cb) => { dLink = link; cb(null) } - ], cb), - - // create a DAGLink to the node with recursive pins - cb => waterfall([ - cb => pinset.storeSet(recursiveKeys(), cb), - ({ node, cid }, cb) => { - try { - cb(null, new DAGLink(types.recursive, node.size, cid)) - } catch (err) { - cb(err) - } - }, - (link, cb) => { rLink = link; cb(null) } - ], cb), - - // the pin-set nodes link to a special 'empty' node, so make sure it exists - cb => { - let empty - - try { - empty = DAGNode.create(Buffer.alloc(0)) - } catch (err) { - return cb(err) - } - - dag.put(empty, { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }, cb) - }, - - // create a root node with DAGLinks to the direct and recursive DAGs - cb => { - let node - - try { - node = DAGNode.create(Buffer.alloc(0), [dLink, rLink]) - } catch (err) { - return cb(err) - } - - root = node - dag.put(root, { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }, (err, cid) => { - if (!err) { - root.multihash = cid.buffer - } - cb(err) - }) - }, - - // hack for CLI tests - cb => repo.closed ? repo.open(cb) : cb(null, null), - - // save root to datastore under a consistent key - cb => repo.datastore.put(pinDataStoreKey, root.multihash, cb) - ], (err, res) => { - if (err) { return callback(err) } - self.log(`Flushed pins with root: ${root}`) - return callback(null, root) - }) - } + const pinManager = new PinManager(self._repo, dag) const pin = { add: promisify((paths, options, callback) => { @@ -203,56 +36,65 @@ module.exports = (self) => { resolvePath(self.object, paths, (err, mhs) => { if (err) { return callback(err) } - // verify that each hash can be pinned - map(mhs, (multihash, cb) => { - const cid = new CID(multihash) - const key = cid.toBaseEncodedString() + const pinAdd = (pinComplete) => { + // verify that each hash can be pinned + map(mhs, (multihash, cb) => { + const cid = new CID(multihash) + const key = cid.toBaseEncodedString() - if (recursive) { - if (recursivePins.has(key)) { - // it's already pinned recursively - return cb(null, key) - } - - // entire graph of nested links should be pinned, - // so make sure we have all the objects - walkDag({ - dag, - cid, - preload: options.preload - }, (err) => cb(err, key)) - } else { - if (recursivePins.has(key)) { - // recursive supersedes direct, can't have both - return cb(new Error(`${key} already pinned recursively`)) - } - if (directPins.has(key)) { - // already directly pinned - return cb(null, key) - } + if (recursive) { + if (pinManager.recursivePins.has(key)) { + // it's already pinned recursively + return cb(null, key) + } - // make sure we have the object - dag.get(cid, { preload: options.preload }, (err) => { - if (err) { - return cb(err) + // entire graph of nested links should be pinned, + // so make sure we have all the objects + pinManager.fetchCompleteDag(key, { preload: options.preload }, (err) => { + if (err) { return cb(err) } + // found all objects, we can add the pin + return cb(null, key) + }) + } else { + if (pinManager.recursivePins.has(key)) { + // recursive supersedes direct, can't have both + return cb(new Error(`${key} already pinned recursively`)) } - // found the object, we can add the pin - return cb(null, key) - }) - } - }, (err, results) => { - if (err) { return callback(err) } + if (pinManager.directPins.has(key)) { + // already directly pinned + return cb(null, key) + } + + // make sure we have the object + dag.get(cid, { preload: options.preload }, (err) => { + if (err) { return cb(err) } + // found the object, we can add the pin + return cb(null, key) + }) + } + }, (err, results) => { + if (err) { return pinComplete(err) } - // update the pin sets in memory - const pinset = recursive ? recursivePins : directPins - results.forEach(key => pinset.add(key)) + // update the pin sets in memory + const pinset = recursive ? pinManager.recursivePins : pinManager.directPins + results.forEach(key => pinset.add(key)) - // persist updated pin sets to datastore - flushPins((err, root) => { - if (err) { return callback(err) } - callback(null, results.map(hash => ({ hash }))) + // persist updated pin sets to datastore + pinManager.flushPins((err, root) => { + if (err) { return pinComplete(err) } + pinComplete(null, results.map(hash => ({ hash }))) + }) }) - }) + } + + // When adding a file, we take a lock that gets released after pinning + // is complete, so don't take a second lock here + const lock = options.lock !== false + if (lock) { + self._gcLock.readLock(pinAdd, callback) + } else { + pinAdd(callback) + } }) }), @@ -274,55 +116,57 @@ module.exports = (self) => { resolvePath(self.object, paths, (err, mhs) => { if (err) { return callback(err) } - // verify that each hash can be unpinned - map(mhs, (multihash, cb) => { - pin._isPinnedWithType(multihash, types.all, (err, res) => { - if (err) { return cb(err) } - const { pinned, reason } = res - const key = toB58String(multihash) - if (!pinned) { - return cb(new Error(`${key} is not pinned`)) - } + self._gcLock.readLock((lockCb) => { + // verify that each hash can be unpinned + map(mhs, (multihash, cb) => { + pinManager.isPinnedWithType(multihash, PinTypes.all, (err, res) => { + if (err) { return cb(err) } + const { pinned, reason } = res + const key = toB58String(multihash) + if (!pinned) { + return cb(new Error(`${key} is not pinned`)) + } - switch (reason) { - case (types.recursive): - if (recursive) { + switch (reason) { + case (PinTypes.recursive): + if (recursive) { + return cb(null, key) + } else { + return cb(new Error(`${key} is pinned recursively`)) + } + case (PinTypes.direct): return cb(null, key) - } else { - return cb(new Error(`${key} is pinned recursively`)) - } - case (types.direct): - return cb(null, key) - default: - return cb(new Error( - `${key} is pinned indirectly under ${reason}` - )) - } - }) - }, (err, results) => { - if (err) { return callback(err) } - - // update the pin sets in memory - results.forEach(key => { - if (recursive && recursivePins.has(key)) { - recursivePins.delete(key) - } else { - directPins.delete(key) - } - }) + default: + return cb(new Error( + `${key} is pinned indirectly under ${reason}` + )) + } + }) + }, (err, results) => { + if (err) { return lockCb(err) } + + // update the pin sets in memory + results.forEach(key => { + if (recursive && pinManager.recursivePins.has(key)) { + pinManager.recursivePins.delete(key) + } else { + pinManager.directPins.delete(key) + } + }) - // persist updated pin sets to datastore - flushPins((err, root) => { - if (err) { return callback(err) } - self.log(`Removed pins: ${results}`) - callback(null, results.map(hash => ({ hash }))) + // persist updated pin sets to datastore + pinManager.flushPins((err, root) => { + if (err) { return lockCb(err) } + self.log(`Removed pins: ${results}`) + lockCb(null, results.map(hash => ({ hash }))) + }) }) - }) + }, callback) }) }), ls: promisify((paths, options, callback) => { - let type = types.all + let type = PinTypes.all if (typeof paths === 'function') { callback = paths options = {} @@ -339,27 +183,28 @@ module.exports = (self) => { options = options || {} if (options.type) { - if (typeof options.type !== 'string') { - return setImmediate(() => callback(invalidPinTypeErr(options.type))) + type = options.type + if (typeof options.type === 'string') { + type = options.type.toLowerCase() + } + const err = PinManager.checkPinType(type) + if (err) { + return setImmediate(() => callback(err)) } - type = options.type.toLowerCase() - } - if (!Object.keys(types).includes(type)) { - return setImmediate(() => callback(invalidPinTypeErr(type))) } if (paths) { // check the pinned state of specific hashes waterfall([ (cb) => resolvePath(self.object, paths, cb), - (hashes, cb) => mapSeries(hashes, (hash, done) => pin._isPinnedWithType(hash, type, done), cb), + (hashes, cb) => mapSeries(hashes, (hash, done) => pinManager.isPinnedWithType(hash, type, done), cb), (results, cb) => { results = results .filter(result => result.pinned) .map(({ key, reason }) => { switch (reason) { - case types.direct: - case types.recursive: + case PinTypes.direct: + case PinTypes.recursive: return { hash: key, type: reason @@ -367,7 +212,7 @@ module.exports = (self) => { default: return { hash: key, - type: `${types.indirect} through ${reason}` + type: `${PinTypes.indirect} through ${reason}` } } }) @@ -382,34 +227,37 @@ module.exports = (self) => { } else { // show all pinned items of type let pins = [] - if (type === types.direct || type === types.all) { + + if (type === PinTypes.direct || type === PinTypes.all) { pins = pins.concat( - Array.from(directPins).map(hash => ({ - type: types.direct, + Array.from(pinManager.directPins).map(hash => ({ + type: PinTypes.direct, hash })) ) } - if (type === types.recursive || type === types.all) { + + if (type === PinTypes.recursive || type === PinTypes.all) { pins = pins.concat( - Array.from(recursivePins).map(hash => ({ - type: types.recursive, + Array.from(pinManager.recursivePins).map(hash => ({ + type: PinTypes.recursive, hash })) ) } - if (type === types.indirect || type === types.all) { - getIndirectKeys(options, (err, indirects) => { + + if (type === PinTypes.indirect || type === PinTypes.all) { + pinManager.getIndirectKeys(options, (err, indirects) => { if (err) { return callback(err) } pins = pins // if something is pinned both directly and indirectly, // report the indirect entry .filter(({ hash }) => !indirects.includes(hash) || - (indirects.includes(hash) && !directPins.has(hash)) + (indirects.includes(hash) && !pinManager.directPins.has(hash)) ) .concat(indirects.map(hash => ({ - type: types.indirect, + type: PinTypes.indirect, hash }))) return callback(null, pins) @@ -420,93 +268,9 @@ module.exports = (self) => { } }), - _isPinnedWithType: promisify((multihash, type, callback) => { - const key = toB58String(multihash) - const { recursive, direct, all } = types - - // recursive - if ((type === recursive || type === all) && recursivePins.has(key)) { - return callback(null, { - key, - pinned: true, - reason: recursive - }) - } - - if (type === recursive) { - return callback(null, { - key, - pinned: false - }) - } - - // direct - if ((type === direct || type === all) && directPins.has(key)) { - return callback(null, { - key, - pinned: true, - reason: direct - }) - } - - if (type === direct) { - return callback(null, { - key, - pinned: false - }) - } - - // indirect (default) - // check each recursive key to see if multihash is under it - // arbitrary limit, enables handling 1000s of pins. - detectLimit(recursiveKeys().map(key => new CID(key)), concurrencyLimit, (cid, cb) => { - waterfall([ - (done) => dag.get(cid, '', { preload: false }, done), - (result, done) => done(null, result.value), - (node, done) => pinset.hasDescendant(node, key, done) - ], cb) - }, (err, cid) => callback(err, { - key, - pinned: Boolean(cid), - reason: cid - })) - }), - - _load: promisify(callback => { - waterfall([ - // hack for CLI tests - (cb) => repo.closed ? repo.datastore.open(cb) : cb(null, null), - (_, cb) => repo.datastore.has(pinDataStoreKey, cb), - (has, cb) => has ? cb() : cb(new Error('No pins to load')), - (cb) => repo.datastore.get(pinDataStoreKey, cb), - (mh, cb) => { - dag.get(new CID(mh), '', { preload: false }, cb) - } - ], (err, pinRoot) => { - if (err) { - if (err.message === 'No pins to load') { - self.log('No pins to load') - return callback() - } else { - return callback(err) - } - } - - parallel([ - cb => pinset.loadSet(pinRoot.value, types.recursive, cb), - cb => pinset.loadSet(pinRoot.value, types.direct, cb) - ], (err, keys) => { - if (err) { return callback(err) } - const [rKeys, dKeys] = keys - - directPins = new Set(dKeys.map(toB58String)) - recursivePins = new Set(rKeys.map(toB58String)) - - self.log('Loaded pins from the datastore') - return callback(null) - }) - }) - }) + _isPinnedWithType: promisify(pinManager.isPinnedWithType.bind(pinManager)), + _getInternalBlocks: promisify(pinManager.getInternalBlocks.bind(pinManager)), + _load: promisify(pinManager.load.bind(pinManager)) } return pin diff --git a/src/core/components/pin/gc-lock.js b/src/core/components/pin/gc-lock.js new file mode 100644 index 0000000000..082b7a687e --- /dev/null +++ b/src/core/components/pin/gc-lock.js @@ -0,0 +1,80 @@ +'use strict' + +const pull = require('pull-stream/pull') +const pullThrough = require('pull-stream/throughs/through') +const pullAsyncMap = require('pull-stream/throughs/async-map') +const Mutex = require('../../../utils/mutex') +const log = require('debug')('ipfs:gc:lock') + +class GCLock { + constructor (repoOwner, options = {}) { + this.mutex = new Mutex(repoOwner, { ...options, log }) + } + + readLock (lockedFn, cb) { + return this.mutex.readLock(lockedFn, cb) + } + + writeLock (lockedFn, cb) { + return this.mutex.writeLock(lockedFn, cb) + } + + pullReadLock (lockedPullFn) { + return this.pullLock('readLock', lockedPullFn) + } + + pullWriteLock (lockedPullFn) { + return this.pullLock('writeLock', lockedPullFn) + } + + pullLock (type, lockedPullFn) { + const pullLocker = new PullLocker(this.mutex, type) + + return pull( + pullLocker.take(), + lockedPullFn(), + pullLocker.release() + ) + } +} + +class PullLocker { + constructor (mutex, type) { + this.mutex = mutex + this.type = type + + // The function to call to release the lock. It is set when the lock is taken + this.releaseLock = null + } + + take () { + return pullAsyncMap((i, cb) => { + // Check if the lock has already been acquired. + // Note: new items will only come through the pull stream once the first + // item has acquired a lock. + if (this.releaseLock) { + // The lock has been acquired so return immediately + return cb(null, i) + } + + // Request the lock + this.mutex[this.type]((releaseLock) => { + // The lock has been granted, so run the locked piece of code + cb(null, i) + + // Save the release function to be called when the stream completes + this.releaseLock = releaseLock + }) + }) + } + + // Releases the lock + release () { + return pullThrough(null, (err) => { + // When the stream completes, release the lock + this.releaseLock(err) + }) + } +} + +module.exports = GCLock diff --git a/src/core/components/pin/gc.js b/src/core/components/pin/gc.js new file mode 100644 index 0000000000..544b151bf3 --- /dev/null +++ b/src/core/components/pin/gc.js @@ -0,0 +1,158 @@ +'use strict' + +const promisify = require('promisify-es6') +const CID = require('cids') +const base32 = require('base32.js') +const parallel = require('async/parallel') +const mapLimit = require('async/mapLimit') +const expErr = require('explain-error') +const { cidToString } = require('../../../utils/cid') +const log = require('debug')('ipfs:gc') +// TODO: Use exported key from root when upgraded to ipfs-mfs@>=13 +// https://github.com/ipfs/js-ipfs-mfs/pull/58 +const { MFS_ROOT_KEY } = require('ipfs-mfs/src/core/utils/constants') + +// Limit on the number of parallel block remove operations +const BLOCK_RM_CONCURRENCY = 256 + +// Perform mark and sweep garbage collection +module.exports = function gc (self) { + return promisify((callback) => { + const start = Date.now() + log('Creating set of marked blocks') + + self._gcLock.writeLock((lockCb) => { + parallel([ + // Get all blocks keys from the blockstore + (cb) => self._repo.blocks.query({ keysOnly: true }, cb), + // Mark all blocks that are being used + (cb) => createMarkedSet(self, cb) + ], (err, [blockKeys, markedSet]) => { + if (err) { + log('GC failed to fetch all block keys and created marked set', err) + return lockCb(err) + } + + // Delete blocks that are not being used + deleteUnmarkedBlocks(self, markedSet, blockKeys, (err, res) => { + log(`Complete (${Date.now() - start}ms)`) + + if (err) { + log('GC failed to delete unmarked blocks', err) + return lockCb(err) + } + lockCb(null, res) + }) + }) + }, callback) + }) +} + +// Get Set of CIDs of blocks to keep +function createMarkedSet (ipfs, callback) { + parallel([ + // All pins, direct and indirect + (cb) => ipfs.pin.ls((err, pins) => { + if (err) { + return cb(expErr(err, 'Could not list pinned blocks')) + } + log(`Found ${pins.length} pinned blocks`) + const cids = pins.map(p => new CID(p.hash)) + // log(' ' + cids.join('\n ')) + cb(null, cids) + }), + + // Blocks used internally by the pinner + (cb) => ipfs.pin._getInternalBlocks((err, cids) => { + if (err) { + return cb(expErr(err, 'Could not list pinner internal blocks')) + } + log(`Found ${cids.length} pinner internal blocks`) + // log(' ' + cids.join('\n ')) + cb(null, cids) + }), + + // The MFS root and all its descendants + (cb) => ipfs._repo.root.get(MFS_ROOT_KEY, (err, mh) => { + if (err) { + if (err.code === 'ERR_NOT_FOUND') { + log(`No blocks in MFS`) + return cb(null, []) + } + return cb(expErr(err, 'Could not get MFS root from datastore')) + } + + getDescendants(ipfs, new CID(mh), cb) + }) + ], (err, res) => { + if (err) { + return callback(err) + } + + const cids = [].concat(...res).map(cid => cidToString(cid, { base: 'base32' })) + return callback(null, new Set(cids)) + }) +} + +// Recursively get descendants of the given CID +function getDescendants (ipfs, cid, callback) { + ipfs.refs(cid, { recursive: true }, (err, refs) => { + if (err) { + return callback(expErr(err, 'Could not get MFS root descendants from store')) + } + const cids = [cid, ...refs.map(r => new CID(r.ref))] + log(`Found ${cids.length} MFS blocks`) + // log(' ' + cids.join('\n ')) + callback(null, cids) + }) +} + +// Delete all blocks that are not marked as in use +function deleteUnmarkedBlocks (ipfs, markedSet, blockKeys, callback) { + // Iterate through all blocks and find those that are not in the marked set + // The blockKeys variable has the form [ { key: Key() }, { key: Key() }, ... ] + const unreferenced = [] + const res = [] + let errCount = 0 + for (const { key: k } of blockKeys) { + try { + const cid = dsKeyToCid(k) + const b32 = cid.toV1().toString('base32') + if (!markedSet.has(b32)) { + unreferenced.push(cid) + } + } catch (err) { + errCount++ + const msg = `Could not convert block with key '${k}' to CID` + log(msg, err) + res.push({ err: new Error(msg + `: ${err.message}`) }) + } + } + + const msg = `Marked set has ${markedSet.size} unique blocks. Blockstore has ${blockKeys.length} blocks. ` + + `Deleting ${unreferenced.length} blocks.` + (errCount ? ` (${errCount} errors)` : '') + log(msg) + // log(' ' + unreferenced.join('\n ')) + + mapLimit(unreferenced, BLOCK_RM_CONCURRENCY, (cid, cb) => { + // Delete blocks from blockstore + ipfs._repo.blocks.delete(cid, (err) => { + const res = { + cid, + err: err && new Error(`Could not delete block with CID ${cid}: ${err.message}`) + } + cb(null, res) + }) + }, (_, delRes) => { + callback(null, res.concat(delRes)) + }) +} + +// TODO: Use exported utility when upgrade to ipfs-repo@>=0.27.1 +// https://github.com/ipfs/js-ipfs-repo/pull/206 +function dsKeyToCid (key) { + // Block key is of the form / + const decoder = new base32.Decoder() + const buff = decoder.write(key.toString().slice(1)).finalize() + return new CID(Buffer.from(buff)) +} diff --git a/src/core/components/pin/pin-manager.js b/src/core/components/pin/pin-manager.js new file mode 100644 index 0000000000..096980fcec --- /dev/null +++ b/src/core/components/pin/pin-manager.js @@ -0,0 +1,322 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const { DAGNode, DAGLink } = require('ipld-dag-pb') +const CID = require('cids') +const series = require('async/series') +const parallel = require('async/parallel') +const eachLimit = require('async/eachLimit') +const waterfall = require('async/waterfall') +const detectLimit = require('async/detectLimit') +const queue = require('async/queue') +const { Key } = require('interface-datastore') +const errCode = require('err-code') +const multicodec = require('multicodec') +const debug = require('debug') +const { cidToString } = require('../../../utils/cid') + +const createPinSet = require('./pin-set') + +// arbitrary limit to the number of concurrent dag operations +const concurrencyLimit = 300 +const PIN_DS_KEY = new Key('/local/pins') + +function invalidPinTypeErr (type) { + const errMsg = `Invalid type '${type}', must be one of {direct, indirect, recursive, all}` + return errCode(new Error(errMsg), 'ERR_INVALID_PIN_TYPE') +} + +const PinTypes = { + direct: 'direct', + recursive: 'recursive', + indirect: 'indirect', + all: 'all' +} + +class PinManager { + constructor (repo, dag) { + this.repo = repo + this.dag = dag + this.log = debug('ipfs:pin') + this.pinset = createPinSet(dag) + this.directPins = new Set() + this.recursivePins = new Set() + } + + _walkDag ({ cid, preload = false, onCid = () => {} }, cb) { + const q = queue(({ cid }, done) => { + this.dag.get(cid, { preload }, (err, result) => { + if (err) { + return done(err) + } + + onCid(cid) + + if (result.value.Links) { + q.push(result.value.Links.map(link => ({ + cid: link.Hash + }))) + } + + done() + }) + }, concurrencyLimit) + q.drain = () => { + cb() + } + q.error = (err) => { + q.kill() + cb(err) + } + q.push({ cid }) + } + + directKeys () { + return Array.from(this.directPins, key => new CID(key).buffer) + } + + recursiveKeys () { + return Array.from(this.recursivePins, key => new CID(key).buffer) + } + + getIndirectKeys ({ preload }, callback) { + const indirectKeys = new Set() + eachLimit(this.recursiveKeys(), concurrencyLimit, (multihash, cb) => { + this._walkDag({ + cid: new CID(multihash), + preload: preload || false, + onCid: (cid) => { + cid = cid.toString() + + // recursive pins pre-empt indirect pins + if (!this.recursivePins.has(cid)) { + indirectKeys.add(cid) + } + } + }, cb) + }, (err) => { + if (err) { return callback(err) } + callback(null, Array.from(indirectKeys)) + }) + } + + // Encode and write pin key sets to the datastore: + // a DAGLink for each of the recursive and direct pinsets + // a DAGNode holding those as DAGLinks, a kind of root pin + flushPins (callback) { + let dLink, rLink, root + series([ + // create a DAGLink to the node with direct pins + cb => waterfall([ + cb => this.pinset.storeSet(this.directKeys(), cb), + ({ node, cid }, cb) => { + try { + cb(null, new DAGLink(PinTypes.direct, node.size, cid)) + } catch (err) { + cb(err) + } + }, + (link, cb) => { dLink = link; cb(null) } + ], cb), + + // create a DAGLink to the node with recursive pins + cb => waterfall([ + cb => this.pinset.storeSet(this.recursiveKeys(), cb), + ({ node, cid }, cb) => { + try { + cb(null, new DAGLink(PinTypes.recursive, node.size, cid)) + } catch (err) { + cb(err) + } + }, + (link, cb) => { rLink = link; cb(null) } + ], cb), + + // the pin-set nodes link to a special 'empty' node, so make sure it exists + cb => { + let empty + + try { + empty = DAGNode.create(Buffer.alloc(0)) + } catch (err) { + return cb(err) + } + + this.dag.put(empty, { + version: 0, + format: multicodec.DAG_PB, + hashAlg: multicodec.SHA2_256, + preload: false + }, cb) + }, + + // create a root node with DAGLinks to the direct and recursive DAGs + cb => { + let node + + try { + node = DAGNode.create(Buffer.alloc(0), [dLink, rLink]) + } catch (err) { + return cb(err) + } + + root = node + this.dag.put(root, { + version: 0, + format: multicodec.DAG_PB, + hashAlg: multicodec.SHA2_256, + preload: false + }, (err, cid) => { + if (!err) { + root.multihash = cid.buffer + } + cb(err) + }) + }, + + // save root to datastore under a consistent key + cb => this.repo.datastore.put(PIN_DS_KEY, root.multihash, cb) + ], (err, res) => { + if (err) { return callback(err) } + this.log(`Flushed pins with root: ${root}`) + return callback(null, root) + }) + } + + load (callback) { + waterfall([ + (cb) => this.repo.datastore.has(PIN_DS_KEY, cb), + (has, cb) => has ? cb() : cb(new Error('No pins to load')), + (cb) => this.repo.datastore.get(PIN_DS_KEY, cb), + (mh, cb) => { + this.dag.get(new CID(mh), '', { preload: false }, cb) + } + ], (err, pinRoot) => { + if (err) { + if (err.message === 'No pins to load') { + this.log('No pins to load') + return callback() + } else { + return callback(err) + } + } + + parallel([ + cb => this.pinset.loadSet(pinRoot.value, PinTypes.recursive, cb), + cb => this.pinset.loadSet(pinRoot.value, PinTypes.direct, cb) + ], (err, keys) => { + if (err) { return callback(err) } + const [rKeys, dKeys] = keys + + this.directPins = new Set(dKeys.map(k => cidToString(k))) + this.recursivePins = new Set(rKeys.map(k => cidToString(k))) + + this.log('Loaded pins from the datastore') + return callback(null) + }) + }) + } + + isPinnedWithType (multihash, type, callback) { + const key = cidToString(multihash) + const { recursive, direct, all } = PinTypes + + // recursive + if ((type === recursive || type === all) && this.recursivePins.has(key)) { + return callback(null, { + key, + pinned: true, + reason: recursive + }) + } + + if (type === recursive) { + return callback(null, { + key, + pinned: false + }) + } + + // direct + if ((type === direct || type === all) && this.directPins.has(key)) { + return callback(null, { + key, + pinned: true, + reason: direct + }) + } + + if (type === direct) { + return callback(null, { + key, + pinned: false + }) + } + + // indirect (default) + // check each recursive key to see if multihash is under it + // arbitrary limit, enables handling 1000s of pins. + detectLimit(this.recursiveKeys().map(key => new CID(key)), concurrencyLimit, (cid, cb) => { + waterfall([ + (done) => this.dag.get(cid, '', { preload: false }, done), + (result, done) => done(null, result.value), + (node, done) => this.pinset.hasDescendant(node, key, done) + ], cb) + }, (err, cid) => callback(err, { + key, + pinned: Boolean(cid), + reason: cid + })) + } + + // Gets CIDs of blocks used internally by the pinner + getInternalBlocks (callback) { + this.repo.datastore.get(PIN_DS_KEY, (err, mh) => { + if (err) { + if (err.code === 'ERR_NOT_FOUND') { + this.log(`No pinned blocks`) + return callback(null, []) + } + return callback(new Error(`Could not get pin sets root from datastore: ${err.message}`)) + } + + const cid = new CID(mh) + this.dag.get(cid, '', { preload: false }, (err, obj) => { + if (err) { + return callback(new Error(`Could not get pin sets from store: ${err.message}`)) + } + + // The pinner stores an object that has two links to pin sets: + // 1. The directly pinned CIDs + // 2. The recursively pinned CIDs + // If large enough, these pin sets may have links to buckets to hold + // the pins + this.pinset.getInternalCids(obj.value, (err, cids) => { + if (err) { + return callback(new Error(`Could not get pinner internal cids: ${err.message}`)) + } + + callback(null, cids.concat(cid)) + }) + }) + }) + } + + fetchCompleteDag (cid, options, callback) { + this._walkDag({ + cid, + preload: options.preload + }, callback) + } + + // Returns an error if the pin type is invalid + static checkPinType (type) { + if (typeof type !== 'string' || !Object.keys(PinTypes).includes(type)) { + return invalidPinTypeErr(type) + } + } +} + +PinManager.PinTypes = PinTypes + +module.exports = PinManager diff --git a/src/core/components/pin-set.js b/src/core/components/pin/pin-set.js similarity index 89% rename from src/core/components/pin-set.js rename to src/core/components/pin/pin-set.js index 6f3a9f98dc..04a389bf2c 100644 --- a/src/core/components/pin-set.js +++ b/src/core/components/pin/pin-set.js @@ -8,6 +8,7 @@ const varint = require('varint') const { DAGNode, DAGLink } = require('ipld-dag-pb') const multicodec = require('multicodec') const someSeries = require('async/someSeries') +const eachSeries = require('async/eachSeries') const eachOfSeries = require('async/eachOfSeries') const pbSchema = require('./pin.proto') @@ -230,15 +231,15 @@ exports = module.exports = function (dag) { dag.get(link.Hash, '', { preload: false }, (err, res) => { if (err) { return callback(err) } const keys = [] - const step = link => keys.push(link.Hash.buffer) - pinSet.walkItems(res.value, step, err => { + const stepPin = link => keys.push(link.Hash.buffer) + pinSet.walkItems(res.value, { stepPin }, err => { if (err) { return callback(err) } return callback(null, keys) }) }) }, - walkItems: (node, step, callback) => { + walkItems: (node, { stepPin = () => {}, stepBin = () => {} }, callback) => { let pbh try { pbh = readHeader(node) @@ -253,19 +254,37 @@ exports = module.exports = function (dag) { const linkHash = link.Hash.buffer if (!emptyKey.equals(linkHash)) { + stepBin(link, idx, pbh.data) + // walk the links of this fanout bin return dag.get(linkHash, '', { preload: false }, (err, res) => { if (err) { return eachCb(err) } - pinSet.walkItems(res.value, step, eachCb) + pinSet.walkItems(res.value, { stepPin, stepBin }, eachCb) }) } } else { // otherwise, the link is a pin - step(link, idx, pbh.data) + stepPin(link, idx, pbh.data) } eachCb(null) }, callback) + }, + + getInternalCids: (rootNode, callback) => { + // "Empty block" used by the pinner + const cids = [new CID(emptyKey)] + + const stepBin = link => cids.push(link.Hash) + eachSeries(rootNode.Links, (topLevelLink, cb) => { + cids.push(topLevelLink.Hash) + + dag.get(topLevelLink.Hash, '', { preload: false }, (err, res) => { + if (err) { return cb(err) } + + pinSet.walkItems(res.value, { stepBin }, cb) + }) + }, (err) => callback(err, cids)) } } return pinSet diff --git a/src/core/components/pin.proto.js b/src/core/components/pin/pin.proto.js similarity index 100% rename from src/core/components/pin.proto.js rename to src/core/components/pin/pin.proto.js diff --git a/src/core/components/repo.js b/src/core/components/repo.js index 23116d8cf5..25b7cf02ea 100644 --- a/src/core/components/repo.js +++ b/src/core/components/repo.js @@ -38,14 +38,7 @@ module.exports = function repo (self) { }) }), - gc: promisify((options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - - callback(new Error('Not implemented')) - }), + gc: require('./pin/gc')(self), stat: promisify((options, callback) => { if (typeof options === 'function') { diff --git a/src/core/index.js b/src/core/index.js index 9ab064a607..8b0b717716 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -20,12 +20,14 @@ const EventEmitter = require('events') const config = require('./config') const boot = require('./boot') const components = require('./components') +const GCLock = require('./components/pin/gc-lock') // replaced by repo-browser when running in the browser const defaultRepo = require('./runtime/repo-nodejs') const preload = require('./preload') const mfsPreload = require('./mfs-preload') const ipldOptions = require('./runtime/ipld-nodejs') + /** * @typedef { import("./ipns/index") } IPNS */ @@ -89,6 +91,11 @@ class IPFS extends EventEmitter { this._ipns = undefined // eslint-disable-next-line no-console this._print = this._options.silent ? this.log : console.log + this._gcLock = new GCLock(this._options.repoOwner, { + // Make sure GCLock is specific to repo, for tests where there are + // multiple instances of IPFS + morticeId: this._repo.path + }) // IPFS Core exposed components // - for booting up a node diff --git a/src/http/api/resources/repo.js b/src/http/api/resources/repo.js index 998431111a..108ce6abda 100644 --- a/src/http/api/resources/repo.js +++ b/src/http/api/resources/repo.js @@ -1,9 +1,28 @@ 'use strict' -exports.gc = async (request, h) => { - const { ipfs } = request.server.app - await ipfs.repo.gc() - return h.response() +const Joi = require('@hapi/joi') + +exports.gc = { + validate: { + query: Joi.object().keys({ + 'stream-errors': Joi.boolean().default(false) + }).unknown() + }, + + async handler (request, h) { + const streamErrors = request.query['stream-errors'] + const { ipfs } = request.server.app + const res = await ipfs.repo.gc() + + const filtered = res.filter(r => !r.err || streamErrors) + const response = filtered.map(r => { + return { + Err: r.err && r.err.message, + Key: !r.err && { '/': r.cid.toString() } + } + }) + return h.response(response) + } } exports.version = async (request, h) => { diff --git a/src/http/api/routes/repo.js b/src/http/api/routes/repo.js index 21b306e51e..5f2212385c 100644 --- a/src/http/api/routes/repo.js +++ b/src/http/api/routes/repo.js @@ -12,6 +12,14 @@ module.exports = [ method: '*', path: '/api/v0/repo/stat', handler: resources.repo.stat + }, + { + method: '*', + path: '/api/v0/repo/gc', + options: { + validate: resources.repo.gc.validate + }, + handler: resources.repo.gc.handler } // TODO: implement the missing spec https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/REPO.md ] diff --git a/src/utils/mutex.js b/src/utils/mutex.js new file mode 100644 index 0000000000..b91f3139a4 --- /dev/null +++ b/src/utils/mutex.js @@ -0,0 +1,61 @@ +'use strict' + +const assert = require('assert') +const mortice = require('mortice') +const noop = () => {} + +// Wrap mortice to present a callback interface +class Mutex { + constructor (repoOwner, options = {}) { + this.mutex = mortice(options.morticeId, { + singleProcess: repoOwner + }) + + this.log = options.log || noop + this.lockId = 0 + } + + readLock (lockedFn, cb) { + return this._lock('readLock', lockedFn, cb) + } + + writeLock (lockedFn, cb) { + return this._lock('writeLock', lockedFn, cb) + } + + /** + * Request a read or write lock + * + * @param {String} type The type of lock: readLock / writeLock + * @param {function(releaseLock)} lockedFn A function that runs the locked piece of code and calls releaseLock when it completes + * @param {function(err, res)} [cb] A function that is called when the locked function completes + * @returns {void} + */ + _lock (type, lockedFn, cb = noop) { + assert(typeof lockedFn === 'function', `first argument to Mutex.${type}() must be a function`) + assert(typeof cb === 'function', `second argument to Mutex.${type}() must be a callback function`) + + const lockId = this.lockId++ + this.log(`[${lockId}] ${type} requested`) + + // mortice presents a promise based API, so we need to give it a function + // that returns a Promise. + // The function is invoked when mortice gives permission to run the locked + // piece of code + const locked = () => new Promise((resolve, reject) => { + this.log(`[${lockId}] ${type} started`) + lockedFn((err, res) => { + this.log(`[${lockId}] ${type} released`) + err ? reject(err) : resolve(res) + }) + }) + + // Get a Promise for the lock + const lock = this.mutex[type](locked) + + // When the locked piece of code is complete, the Promise resolves + return lock.then(res => cb(null, res), cb) + } +} + +module.exports = Mutex diff --git a/test/cli/gc.js b/test/cli/gc.js new file mode 100644 index 0000000000..d66c82f56d --- /dev/null +++ b/test/cli/gc.js @@ -0,0 +1,66 @@ +/* eslint-env mocha */ +'use strict' + +const sinon = require('sinon') +const YargsPromise = require('yargs-promise') +const CID = require('cids') +const cliUtils = require('../../src/cli/utils') +const cli = new YargsPromise(require('../../src/cli/parser')) + +describe('gc', () => { + const setupMocks = (cids, errMsg) => { + let gcRes = cids.map(h => ({ cid: new CID(h) })) + if (errMsg) { + gcRes = gcRes.concat([{ err: new Error(errMsg) }]) + } + + const gcFake = sinon.fake.returns(gcRes) + sinon + .stub(cliUtils, 'getIPFS') + .callsArgWith(1, null, { repo: { gc: gcFake } }) + + return sinon.stub(cliUtils, 'print') + } + + afterEach(() => { + sinon.restore() + }) + + it('gc with no flags prints errors and outputs hashes', async () => { + const cids = [ + 'Qmd286K6pohQcTKYqnS1YhWrCiS4gz7Xi34sdwMe9USZ7u', + 'QmVc6zuAneKJzicnJpfrqCH9gSy6bz54JhcypfJYhGUFQu' + ] + const errMsg = 'some err' + const printSpy = setupMocks(cids, errMsg) + + await cli.parse(`repo gc`) + + const exp = cids.map(c => 'removed ' + c).concat(errMsg) + for (let i = 0; i < exp.length; i++) { + sinon.assert.calledWith(printSpy.getCall(i), exp[i]) + } + }) + + it('gc with --quiet prints hashes only', async () => { + const cids = [ + 'Qmd286K6pohQcTKYqnS1YhWrCiS4gz7Xi34sdwMe9USZ7u', + 'QmVc6zuAneKJzicnJpfrqCH9gSy6bz54JhcypfJYhGUFQu' + ] + const printSpy = setupMocks(cids) + + await cli.parse(`repo gc --quiet`) + + const exp = cids.map(c => c.toString()) + for (let i = 0; i < exp.length; i++) { + sinon.assert.calledWith(printSpy.getCall(i), exp[i]) + } + }) + + it('gc with --stream-errors=false does not print errors', async () => { + const printSpy = setupMocks([], 'some err') + + await cli.parse(`repo gc --stream-errors=false`) + sinon.assert.notCalled(printSpy) + }) +}) diff --git a/test/cli/repo.js b/test/cli/repo.js index 17c04aaaa3..3c83d2d468 100644 --- a/test/cli/repo.js +++ b/test/cli/repo.js @@ -3,7 +3,6 @@ const expect = require('chai').expect const repoVersion = require('ipfs-repo').repoVersion - const runOnAndOff = require('../utils/on-and-off') describe('repo', () => runOnAndOff((thing) => { diff --git a/test/core/gc-lock.spec.js b/test/core/gc-lock.spec.js new file mode 100644 index 0000000000..d44255b1bd --- /dev/null +++ b/test/core/gc-lock.spec.js @@ -0,0 +1,301 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const parallel = require('async/parallel') +const pull = require('pull-stream') +const pullThrough = require('pull-stream/throughs/through') +const pullAsyncMap = require('pull-stream/throughs/async-map') +const pullCollect = require('pull-stream/sinks/collect') +const pullValues = require('pull-stream/sources/values') +const GCLock = require('../../src/core/components/pin/gc-lock') + +const cbTakeLock = (type, lock, out, id, duration) => { + return (cb) => lock[type + 'Lock']((lockCb) => { + out.push(`${type} ${id} start`) + setTimeout(() => { + out.push(`${type} ${id} end`) + lockCb() + }, duration) + }, cb) +} +const cbReadLock = (lock, out, id, duration) => { + return cbTakeLock('read', lock, out, id, duration) +} +const cbWriteLock = (lock, out, id, duration) => { + return cbTakeLock('write', lock, out, id, duration) +} +const cbTakeLockError = (type, lock, out, errs, id, duration) => { + return (cb) => lock[type + 'Lock']((lockCb) => { + out.push(`${type} ${id} start`) + setTimeout(() => { + out.push(`${type} ${id} error`) + lockCb(new Error('err')) + }, duration) + }, (err) => { + errs.push(err) + cb() + }) +} +const cbReadLockError = (lock, out, errs, id, duration) => { + return cbTakeLockError('read', lock, out, errs, id, duration) +} +const cbWriteLockError = (lock, out, errs, id, duration) => { + return cbTakeLockError('write', lock, out, errs, id, duration) +} + +const pullTakeLock = (type, lock, out, id, duration) => { + const lockFn = type === 'read' ? 'pullReadLock' : 'pullWriteLock' + const vals = ['a', 'b', 'c'] + return (cb) => { + pull( + pullValues(vals), + lock[lockFn](() => { + let started = false + return pull( + pullThrough((i) => { + if (!started) { + out.push(`${type} ${id} start`) + started = true + } + }), + pullAsyncMap((i, cb) => { + setTimeout(() => cb(null, i), duration / vals.length) + }) + ) + }), + pullCollect(() => { + out.push(`${type} ${id} end`) + cb() + }) + ) + } +} +const pullReadLock = (lock, out, id, duration) => { + return pullTakeLock('read', lock, out, id, duration) +} +const pullWriteLock = (lock, out, id, duration) => { + return pullTakeLock('write', lock, out, id, duration) +} +const pullTakeLockError = (type, lock, out, errs, id, duration) => { + const lockFn = type === 'read' ? 'pullReadLock' : 'pullWriteLock' + const vals = ['a', 'b', 'c'] + return (cb) => { + pull( + pullValues(vals), + lock[lockFn](() => { + let started = false + return pull( + pullThrough((i) => { + if (!started) { + out.push(`${type} ${id} start`) + started = true + } + }), + pullAsyncMap((i, cb) => { + setTimeout(() => cb(new Error('err')), duration) + }) + ) + }), + pullCollect((err) => { + out.push(`${type} ${id} error`) + errs.push(err) + cb() + }) + ) + } +} +const pullReadLockError = (lock, out, errs, id, duration) => { + return pullTakeLockError('read', lock, out, errs, id, duration) +} +const pullWriteLockError = (lock, out, errs, id, duration) => { + return pullTakeLockError('write', lock, out, errs, id, duration) +} + +const expectResult = (out, exp, errs, expErrCount, done) => { + if (typeof errs === 'function') { + done = errs + } + return () => { + try { + expect(out).to.eql(exp) + if (typeof expErrCount === 'number') { + expect(errs.length).to.eql(expErrCount) + for (const e of errs) { + expect(e.message).to.eql('err') + } + } + } catch (err) { + return done(err) + } + done() + } +} + +const runTests = (suiteName, { readLock, writeLock, readLockError, writeLockError }) => { + describe(suiteName, () => { + it('multiple simultaneous reads', (done) => { + const lock = new GCLock() + const out = [] + parallel([ + readLock(lock, out, 1, 100), + readLock(lock, out, 2, 200), + readLock(lock, out, 3, 300) + ], expectResult(out, [ + 'read 1 start', + 'read 2 start', + 'read 3 start', + 'read 1 end', + 'read 2 end', + 'read 3 end' + ], done)) + }) + + it('multiple simultaneous writes', (done) => { + const lock = new GCLock() + const out = [] + parallel([ + writeLock(lock, out, 1, 100), + writeLock(lock, out, 2, 200), + writeLock(lock, out, 3, 300) + ], expectResult(out, [ + 'write 1 start', + 'write 1 end', + 'write 2 start', + 'write 2 end', + 'write 3 start', + 'write 3 end' + ], done)) + }) + + it('read then write then read', (done) => { + const lock = new GCLock() + const out = [] + parallel([ + readLock(lock, out, 1, 100), + writeLock(lock, out, 1, 100), + readLock(lock, out, 2, 100) + ], expectResult(out, [ + 'read 1 start', + 'read 1 end', + 'write 1 start', + 'write 1 end', + 'read 2 start', + 'read 2 end' + ], done)) + }) + + it('write then read then write', (done) => { + const lock = new GCLock() + const out = [] + parallel([ + writeLock(lock, out, 1, 100), + readLock(lock, out, 1, 100), + writeLock(lock, out, 2, 100) + ], expectResult(out, [ + 'write 1 start', + 'write 1 end', + 'read 1 start', + 'read 1 end', + 'write 2 start', + 'write 2 end' + ], done)) + }) + + it('two simultaneous reads then write then read', (done) => { + const lock = new GCLock() + const out = [] + parallel([ + readLock(lock, out, 1, 100), + readLock(lock, out, 2, 200), + writeLock(lock, out, 1, 100), + readLock(lock, out, 3, 100) + ], expectResult(out, [ + 'read 1 start', + 'read 2 start', + 'read 1 end', + 'read 2 end', + 'write 1 start', + 'write 1 end', + 'read 3 start', + 'read 3 end' + ], done)) + }) + + it('two simultaneous writes then read then write', (done) => { + const lock = new GCLock() + const out = [] + parallel([ + writeLock(lock, out, 1, 100), + writeLock(lock, out, 2, 100), + readLock(lock, out, 1, 100), + writeLock(lock, out, 3, 100) + ], expectResult(out, [ + 'write 1 start', + 'write 1 end', + 'write 2 start', + 'write 2 end', + 'read 1 start', + 'read 1 end', + 'write 3 start', + 'write 3 end' + ], done)) + }) + + it('simultaneous reads with error then write', (done) => { + const lock = new GCLock() + const out = [] + const errs = [] + parallel([ + readLockError(lock, out, errs, 1, 100), + readLock(lock, out, 2, 200), + writeLock(lock, out, 1, 100) + ], expectResult(out, [ + 'read 1 start', + 'read 2 start', + 'read 1 error', + 'read 2 end', + 'write 1 start', + 'write 1 end' + ], errs, 1, done)) + }) + + it('simultaneous writes with error then read', (done) => { + const lock = new GCLock() + const out = [] + const errs = [] + parallel([ + writeLockError(lock, out, errs, 1, 100), + writeLock(lock, out, 2, 100), + readLock(lock, out, 1, 100) + ], expectResult(out, [ + 'write 1 start', + 'write 1 error', + 'write 2 start', + 'write 2 end', + 'read 1 start', + 'read 1 end' + ], errs, 1, done)) + }) + }) +} + +describe('gc-lock', function () { + runTests('cb style lock', { + readLock: cbReadLock, + writeLock: cbWriteLock, + readLockError: cbReadLockError, + writeLockError: cbWriteLockError + }) + + runTests('pull stream style lock', { + readLock: pullReadLock, + writeLock: pullWriteLock, + readLockError: pullReadLockError, + writeLockError: pullWriteLockError + }) +}) diff --git a/test/core/gc.spec.js b/test/core/gc.spec.js new file mode 100644 index 0000000000..239c8816a7 --- /dev/null +++ b/test/core/gc.spec.js @@ -0,0 +1,279 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const IPFSFactory = require('ipfsd-ctl') +const pEvent = require('p-event') +const env = require('ipfs-utils/src/env') +const IPFS = require('../../src/core') + +// We need to detect when a readLock or writeLock is requested for the tests +// so we override the Mutex class to emit an event +const EventEmitter = require('events') +const Mutex = require('../../src/utils/mutex') + +class MutexEmitter extends Mutex { + constructor (repoOwner) { + super(repoOwner) + this.emitter = new EventEmitter() + } + + readLock (lockedFn, cb) { + this.emitter.emit('readLock request') + return super.readLock(lockedFn, cb) + } + + writeLock (lockedFn, cb) { + this.emitter.emit('writeLock request') + return super.writeLock(lockedFn, cb) + } +} + +describe('gc', function () { + const fixtures = [{ + path: 'test/my/path1', + content: Buffer.from('path1') + }, { + path: 'test/my/path2', + content: Buffer.from('path2') + }, { + path: 'test/my/path3', + content: Buffer.from('path3') + }, { + path: 'test/my/path4', + content: Buffer.from('path4') + }] + + let ipfsd + let ipfs + let lockEmitter + + before(function (done) { + this.timeout(40 * 1000) + + const factory = IPFSFactory.create({ type: 'proc', exec: IPFS }) + + const config = { Bootstrap: [] } + if (env.isNode) { + config.Addresses = { + Swarm: ['/ip4/127.0.0.1/tcp/0'] + } + } + + factory.spawn({ config }, (err, node) => { + expect(err).to.not.exist() + + ipfsd = node + ipfs = ipfsd.api + + // Replace the Mutex with one that emits events when a readLock or + // writeLock is requested (needed in the tests below) + ipfs._gcLock.mutex = new MutexEmitter(ipfs._options.repoOwner) + lockEmitter = ipfs._gcLock.mutex.emitter + + done() + }) + }) + + after((done) => { + ipfsd.stop(done) + }) + + const blockAddTests = [{ + name: 'add', + add1: () => ipfs.add(fixtures[0], { pin: false }), + add2: () => ipfs.add(fixtures[1], { pin: false }), + resToCid: (res) => res[0].hash + }, { + name: 'object put', + add1: () => ipfs.object.put({ Data: 'obj put 1', Links: [] }), + add2: () => ipfs.object.put({ Data: 'obj put 2', Links: [] }), + resToCid: (res) => res.toString() + }, { + name: 'block put', + add1: () => ipfs.block.put(Buffer.from('block put 1'), null), + add2: () => ipfs.block.put(Buffer.from('block put 2'), null), + resToCid: (res) => res.cid.toString() + }] + + describe('locks', function () { + for (const test of blockAddTests) { + // eslint-disable-next-line no-loop-func + it(`garbage collection should wait for pending ${test.name} to finish`, async () => { + // Add blocks to IPFS + // Note: add operation will take a read lock + const addLockRequested = pEvent(lockEmitter, 'readLock request') + const add1 = test.add1() + + // Once add lock has been requested, start GC + await addLockRequested + // Note: GC will take a write lock + const gcStarted = pEvent(lockEmitter, 'writeLock request') + const gc = ipfs.repo.gc() + + // Once GC has started, start second add + await gcStarted + const add2 = test.add2() + + const deleted = (await gc).map(i => i.cid.toString()) + const add1Res = test.resToCid(await add1) + const add2Res = test.resToCid(await add2) + + // Should have garbage collected blocks from first add, because GC should + // have waited for first add to finish + expect(deleted).includes(add1Res) + + // Should not have garbage collected blocks from second add, because + // second add should have waited for GC to finish + expect(deleted).not.includes(add2Res) + }) + } + + it('garbage collection should wait for pending add + pin to finish', async () => { + // Add blocks to IPFS + // Note: add operation will take a read lock + const addLockRequested = pEvent(lockEmitter, 'readLock request') + const add1 = ipfs.add(fixtures[2], { pin: true }) + + // Once add lock has been requested, start GC + await addLockRequested + // Note: GC will take a write lock + const gcStarted = pEvent(lockEmitter, 'writeLock request') + const gc = ipfs.repo.gc() + + // Once GC has started, start second add + await gcStarted + const add2 = ipfs.add(fixtures[3], { pin: true }) + + const deleted = (await gc).map(i => i.cid.toString()) + const add1Res = (await add1)[0].hash + const add2Res = (await add2)[0].hash + + // Should not have garbage collected blocks from first add, because GC should + // have waited for first add + pin to finish (protected by pin) + expect(deleted).not.includes(add1Res) + + // Should not have garbage collected blocks from second add, because + // second add should have waited for GC to finish + expect(deleted).not.includes(add2Res) + }) + + it('garbage collection should wait for pending block rm to finish', async () => { + // Add two blocks so that we can remove them + const cid1 = (await ipfs.block.put(Buffer.from('block to rm 1'), null)).cid + const cid2 = (await ipfs.block.put(Buffer.from('block to rm 2'), null)).cid + + // Remove first block from IPFS + // Note: block rm will take a write lock + const rmLockRequested = pEvent(lockEmitter, 'writeLock request') + const rm1 = ipfs.block.rm(cid1) + + // Once rm lock has been requested, start GC + await rmLockRequested + // Note: GC will take a write lock + const gcStarted = pEvent(lockEmitter, 'writeLock request') + const gc = ipfs.repo.gc() + + // Once GC has started, start second rm + await gcStarted + const rm2 = ipfs.block.rm(cid2) + + const deleted = (await gc).map(i => i.cid.toString()) + await rm1 + + // Second rm should fail because GC has already removed that block + try { + await rm2 + } catch (err) { + expect(err.code).eql('ERR_DB_DELETE_FAILED') + } + + // Confirm second block has been removed + const localRefs = (await ipfs.refs.local()).map(r => r.ref) + expect(localRefs).not.includes(cid2.toString()) + + // Should not have garbage collected block from first block put, because + // GC should have waited for first rm (removing first block put) to finish + expect(deleted).not.includes(cid1.toString()) + + // Should have garbage collected block from second block put, because GC + // should have completed before second rm (removing second block put) + expect(deleted).includes(cid2.toString()) + }) + + it('garbage collection should wait for pending pin add to finish', async () => { + // Add two blocks so that we can pin them + const cid1 = (await ipfs.block.put(Buffer.from('block to pin add 1'), null)).cid + const cid2 = (await ipfs.block.put(Buffer.from('block to pin add 2'), null)).cid + + // Pin first block + // Note: pin add will take a read lock + const pinLockRequested = pEvent(lockEmitter, 'readLock request') + const pin1 = ipfs.pin.add(cid1) + + // Once pin lock has been requested, start GC + await pinLockRequested + const gc = ipfs.repo.gc() + const deleted = (await gc).map(i => i.cid.toString()) + await pin1 + + // TODO: Adding pin for removed block never returns, which means the lock + // never gets released + // const pin2 = ipfs.pin.add(cid2) + + // Confirm second second block has been removed + const localRefs = (await ipfs.refs.local()).map(r => r.ref) + expect(localRefs).not.includes(cid2.toString()) + + // Should not have garbage collected block from first block put, because + // GC should have waited for pin (protecting first block put) to finish + expect(deleted).not.includes(cid1.toString()) + + // Should have garbage collected block from second block put, because GC + // should have completed before second pin + expect(deleted).includes(cid2.toString()) + }) + + it('garbage collection should wait for pending pin rm to finish', async () => { + // Add two blocks so that we can pin them + const cid1 = (await ipfs.block.put(Buffer.from('block to pin rm 1'), null)).cid + const cid2 = (await ipfs.block.put(Buffer.from('block to pin rm 2'), null)).cid + + // Pin blocks + await ipfs.pin.add(cid1) + await ipfs.pin.add(cid2) + + // Unpin first block + // Note: pin rm will take a read lock + const pinLockRequested = pEvent(lockEmitter, 'readLock request') + const pinRm1 = ipfs.pin.rm(cid1) + + // Once pin lock has been requested, start GC + await pinLockRequested + // Note: GC will take a write lock + const gcStarted = pEvent(lockEmitter, 'writeLock request') + const gc = ipfs.repo.gc() + + // Once GC has started, start second pin rm + await gcStarted + const pinRm2 = ipfs.pin.rm(cid2) + + const deleted = (await gc).map(i => i.cid.toString()) + await pinRm1 + await pinRm2 + + // Should have garbage collected block from first block put, because + // GC should have waited for pin rm (unpinning first block put) to finish + expect(deleted).includes(cid1.toString()) + + // Should not have garbage collected block from second block put, because + // GC should have completed before second block was unpinned + expect(deleted).not.includes(cid2.toString()) + }) + }) +}) diff --git a/test/core/interface.spec.js b/test/core/interface.spec.js index b8fd32a2e5..5bdbe3f853 100644 --- a/test/core/interface.spec.js +++ b/test/core/interface.spec.js @@ -157,15 +157,7 @@ describe('interface-ipfs-core tests', function () { } }) - tests.repo(defaultCommonFactory, { - skip: [ - // repo.gc - { - name: 'gc', - reason: 'TODO: repo.gc is not implemented in js-ipfs yet!' - } - ] - }) + tests.repo(defaultCommonFactory) tests.stats(defaultCommonFactory) diff --git a/test/core/pin-set.js b/test/core/pin-set.js index 3df518bc27..180c32167f 100644 --- a/test/core/pin-set.js +++ b/test/core/pin-set.js @@ -19,9 +19,10 @@ const { const CID = require('cids') const IPFS = require('../../src/core') -const createPinSet = require('../../src/core/components/pin-set') +const createPinSet = require('../../src/core/components/pin/pin-set') const createTempRepo = require('../utils/create-repo-nodejs') +const emptyKeyHash = 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n' const defaultFanout = 256 const maxItems = 8192 @@ -186,7 +187,7 @@ describe('pinSet', function () { createNode('datum', (err, node) => { expect(err).to.not.exist() - pinSet.walkItems(node, () => {}, (err, res) => { + pinSet.walkItems(node, {}, (err, res) => { expect(err).to.exist() expect(res).to.not.exist() done() @@ -194,9 +195,33 @@ describe('pinSet', function () { }) }) + it('visits all links of a root node', function (done) { + this.timeout(90 * 1000) + + const seenPins = [] + const stepPin = (link, idx, data) => seenPins.push({ link, idx, data }) + const seenBins = [] + const stepBin = (link, idx, data) => seenBins.push({ link, idx, data }) + + createNodes(maxItems + 1, (err, nodes) => { + expect(err).to.not.exist() + + pinSet.storeSet(nodes, (err, result) => { + expect(err).to.not.exist() + + pinSet.walkItems(result.node, { stepPin, stepBin }, err => { + expect(err).to.not.exist() + expect(seenPins).to.have.length(maxItems + 1) + expect(seenBins).to.have.length(defaultFanout) + done() + }) + }) + }) + }) + it('visits all non-fanout links of a root node', function (done) { const seen = [] - const walker = (link, idx, data) => seen.push({ link, idx, data }) + const stepPin = (link, idx, data) => seen.push({ link, idx, data }) createNodes(defaultFanout, (err, nodes) => { expect(err).to.not.exist() @@ -204,7 +229,7 @@ describe('pinSet', function () { pinSet.storeSet(nodes, (err, result) => { expect(err).to.not.exist() - pinSet.walkItems(result.node, walker, err => { + pinSet.walkItems(result.node, { stepPin }, err => { expect(err).to.not.exist() expect(seen).to.have.length(defaultFanout) expect(seen[0].idx).to.eql(defaultFanout) @@ -218,4 +243,26 @@ describe('pinSet', function () { }) }) }) + + describe('getInternalCids', function () { + it('gets all links and empty key CID', function (done) { + createNodes(defaultFanout, (err, nodes) => { + expect(err).to.not.exist() + + pinSet.storeSet(nodes, (err, result) => { + expect(err).to.not.exist() + + const rootNode = DAGNode.create('pins', [{ Hash: result.cid }]) + pinSet.getInternalCids(rootNode, (err, cids) => { + expect(err).to.not.exist() + expect(cids.length).to.eql(2) + const cidStrs = cids.map(c => c.toString()) + expect(cidStrs).includes(emptyKeyHash) + expect(cidStrs).includes(result.cid.toString()) + done() + }) + }) + }) + }) + }) }) diff --git a/test/http-api/interface.js b/test/http-api/interface.js index b94004085f..a3419e4be5 100644 --- a/test/http-api/interface.js +++ b/test/http-api/interface.js @@ -152,15 +152,7 @@ describe('interface-ipfs-core over ipfs-http-client tests', () => { } })) - tests.repo(defaultCommonFactory, { - skip: [ - // repo.gc - { - name: 'gc', - reason: 'TODO: repo.gc is not implemented in js-ipfs yet!' - } - ] - }) + tests.repo(defaultCommonFactory) tests.stats(defaultCommonFactory) diff --git a/test/sharness/t0030-gc.sh b/test/sharness/t0030-gc.sh new file mode 100755 index 0000000000..599f370581 --- /dev/null +++ b/test/sharness/t0030-gc.sh @@ -0,0 +1,26 @@ +#!/bin/sh +# +# Copyright (c) 2014 Christian Couder +# MIT Licensed; see the LICENSE file in this repository. +# + +test_description="Stress test Garbage Collection" + +. lib/test-lib.sh + +test_expect_success "Can add and then GC filesets" ' + export FILE_COUNT=3 + export FILE_SIZE_MB=1 + for (( i=1; i <= FILE_COUNT; i++ )); do dd if=/dev/urandom bs=$((FILE_SIZE_MB * 1048576)) count=1 of=file$i; done + export IPFS_PATH="$(pwd)/.ipfs" && + echo "IPFS_PATH: \"$IPFS_PATH\"" && + BITS="2048" && + ipfs init --bits="$BITS" >actual_init || + test_fsh cat actual_init + for (( i=1; i <= FILE_COUNT; i++ )); do ipfs add --pin=false --quiet file$i > hash$i; done + time ipfs repo gc + ipfs refs local > local-refs + for (( i=1; i <= FILE_COUNT; i++ )); do test_expect_code 1 grep `cat hash$i` local-refs; done +' + +test_done