From eaa8dae80d5c460683dc09fc8804c2f661f9de9e Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 17 Oct 2019 12:33:20 +0100 Subject: [PATCH] feat: EXPERIMENTAL ipfsx API - boot procedure and add API method This PR allows ipfsx to be used by calling `IPFS.create(options)` with `{ EXPERIMENTAL: { ipfsx: true } }` options. It adds a single API method `add` that returns an iterator that yields objects of the form `{ cid, path, size }`. The iterator is decorated with a `first` and `last` function so users can conveniently `await` on the first or last item to be yielded as per the [proposal here](https://github.com/ipfs-shipyard/ipfsx/blob/master/API.md#add). In order to boot up a new ipfsx node I refactored the boot procedure to enable the following: 1. **Remove the big stateful blob "`self`" - components are passed just the dependencies they need to operate.** Right now it is opaque as to which components require which parts of an IPFS node without inspecting the entirety of the component's code. This change makes it easier to look at a component and know what aspects of the IPFS stack it uses and consequently allows us to understand which APIs should be available at which points of the node's lifecycle. It makes the code easier to understand, more maintainable and easier to mock dependencies for unit tests. 1. **Restrict APIs to appropriate lifecycle stage(s).** This PR introduces an `ApiManager` that allows us to update the API that is exposed at any given point. It allows us to (for example) disallow `ipfs.add` before the node is initialized or access `libp2p` before the node is started. The lifecycle methods `init`, `start` and `stop` each define which API methods are available after they have run avoiding having to put boilerplate in every method to check if it can be called when the node is in a particular state. See #1438 1. **Safer and more flexible API usage.** The `ApiManager` allows us to temporarily change APIs to stop `init` from being called again while it is already running and has the facility to rollback to the previous API state if an operation fails. It also enables piggybacking so we don't attempt 2 or more concurrent start/stop calls at once. See #1061 #2257 1. **Enable config changes at runtime.** Having an API that can be updated during a node's lifecycle will enable this feature in the future. **FEEDBACK REQUIRED**: The changes I've made here are a little...racy. They have a bunch of benefits, as I've outlined above but the `ApiManager` is implemented as a `Proxy`, allowing us to swap out the underlying API at will. How do y'all feel about that? Is there a better way or got a suggestion? resolves #1438 resolves #1061 resolves #2257 refs #2509 refs #1670 License: MIT Signed-off-by: Alan Shaw --- package.json | 1 + src/core/api-manager.js | 21 ++ src/core/components-ipfsx/add/index.js | 122 +++++++ src/core/components-ipfsx/add/utils.js | 87 +++++ src/core/components-ipfsx/index.js | 15 + src/core/components-ipfsx/init.js | 302 ++++++++++++++++++ src/core/components-ipfsx/start.js | 131 ++++++++ src/core/components-ipfsx/stop.js | 107 +++++++ .../files-regular/add-async-iterator.js | 159 +-------- src/core/components/files-regular/utils.js | 83 ----- src/core/components/pin.js | 2 +- src/core/errors.js | 19 ++ src/core/index.js | 8 +- src/core/ipfsx.js | 45 +++ src/core/preload.js | 4 +- src/core/runtime/init-assets-browser.js | 1 + src/core/runtime/init-assets-nodejs.js | 15 + src/core/utils.js | 22 ++ test/core/interface.spec.js | 7 + 19 files changed, 917 insertions(+), 234 deletions(-) create mode 100644 src/core/api-manager.js create mode 100644 src/core/components-ipfsx/add/index.js create mode 100644 src/core/components-ipfsx/add/utils.js create mode 100644 src/core/components-ipfsx/index.js create mode 100644 src/core/components-ipfsx/init.js create mode 100644 src/core/components-ipfsx/start.js create mode 100644 src/core/components-ipfsx/stop.js create mode 100644 src/core/errors.js create mode 100644 src/core/ipfsx.js create mode 100644 src/core/runtime/init-assets-browser.js create mode 100644 src/core/runtime/init-assets-nodejs.js diff --git a/package.json b/package.json index 0300c5b8d1..6d9a098b16 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,7 @@ "main": "src/core/index.js", "browser": { "./src/core/components/init-assets.js": false, + "./src/core/runtime/init-assets-nodejs.js": "./src/core/runtime/init-assets-browser.js", "./src/core/runtime/add-from-fs-nodejs.js": "./src/core/runtime/add-from-fs-browser.js", "./src/core/runtime/config-nodejs.js": "./src/core/runtime/config-browser.js", "./src/core/runtime/dns-nodejs.js": "./src/core/runtime/dns-browser.js", diff --git a/src/core/api-manager.js b/src/core/api-manager.js new file mode 100644 index 0000000000..8acc3c9f44 --- /dev/null +++ b/src/core/api-manager.js @@ -0,0 +1,21 @@ +module.exports = class ApiManager { + constructor () { + this._api = {} + this._onUndef = () => undefined + this.api = new Proxy({}, { + get (target, prop) { + return target[prop] === undefined + ? this._onUndef(prop) + : target[prop] + } + }) + } + + update (nextApi, onUndef) { + const prevApi = this._api + const prevUndef = this._onUndef + this._api = nextApi + if (onUndef) this._onUndef = onUndef + return { cancel: () => this.update(prevApi, prevUndef), api: this.api } + } +} diff --git a/src/core/components-ipfsx/add/index.js b/src/core/components-ipfsx/add/index.js new file mode 100644 index 0000000000..7ed62899bc --- /dev/null +++ b/src/core/components-ipfsx/add/index.js @@ -0,0 +1,122 @@ +'use strict' + +const importer = require('ipfs-unixfs-importer') +const normaliseAddInput = require('ipfs-utils/src/files/normalise-input') +const { parseChunkerString } = require('./utils') +const pipe = require('it-pipe') +const { withFirstAndLast } = require('../../utils') + +module.exports = ({ ipld, dag, gcLock, preload, pin, constructorOptions }) => { + return withFirstAndLast(async function * add (source, options) { + options = options || {} + + const opts = { + shardSplitThreshold: constructorOptions.EXPERIMENTAL.sharding ? 1000 : Infinity, + ...options, + strategy: 'balanced', + ...parseChunkerString(options.chunker) + } + + // CID v0 is for multihashes encoded with sha2-256 + if (opts.hashAlg && opts.cidVersion !== 1) { + opts.cidVersion = 1 + } + + if (opts.trickle) { + opts.strategy = 'trickle' + } + + delete opts.trickle + + if (opts.progress) { + let total = 0 + const prog = opts.progress + + opts.progress = (bytes) => { + total += bytes + prog(total) + } + } + + const iterator = pipe( + normaliseAddInput(source), + source => importer(source, ipld, opts), + transformFile(dag, opts), + preloadFile(preload, opts), + pinFile(pin, opts) + ) + + const releaseLock = await gcLock.readLock() + + try { + yield * iterator + } finally { + releaseLock() + } + }) +} + +function transformFile (dag, opts) { + return async function * (source) { + for await (const { cid, path, unixfs } of source) { + if (opts.onlyHash) { + yield { + cid, + path: path || cid.toString(), + size: unixfs.fileSize() + } + + continue + } + + const node = await dag.get(cid, { ...opts, preload: false }) + + yield { + cid, + path: path || cid.toString(), + size: Buffer.isBuffer(node) ? node.length : node.size + } + } + } +} + +function preloadFile (preload, opts) { + return async function * (source) { + for await (const file of source) { + const isRootFile = !file.path || opts.wrapWithDirectory + ? file.path === '' + : !file.path.includes('/') + + const shouldPreload = isRootFile && !opts.onlyHash && opts.preload !== false + + if (shouldPreload) { + preload(file.hash) + } + + yield file + } + } +} + +function pinFile (pin, opts) { + return async function * (source) { + for await (const file of source) { + // Pin a file if it is the root dir of a recursive add or the single file + // of a direct add. + const pin = 'pin' in opts ? opts.pin : true + const isRootDir = !file.path.includes('/') + const shouldPin = pin && isRootDir && !opts.onlyHash + + if (shouldPin) { + // Note: addAsyncIterator() has already taken a GC lock, so tell + // pin.add() not to take a (second) GC lock + await pin.add(file.hash, { + preload: false, + lock: false + }) + } + + yield file + } + } +} diff --git a/src/core/components-ipfsx/add/utils.js b/src/core/components-ipfsx/add/utils.js new file mode 100644 index 0000000000..5c3ee6cc2a --- /dev/null +++ b/src/core/components-ipfsx/add/utils.js @@ -0,0 +1,87 @@ +'use strict' + +/** + * Parses chunker string into options used by DAGBuilder in ipfs-unixfs-engine + * + * + * @param {String} chunker Chunker algorithm supported formats: + * "size-{size}" + * "rabin" + * "rabin-{avg}" + * "rabin-{min}-{avg}-{max}" + * + * @return {Object} Chunker options for DAGBuilder + */ +const parseChunkerString = (chunker) => { + if (!chunker) { + return { + chunker: 'fixed' + } + } else if (chunker.startsWith('size-')) { + const sizeStr = chunker.split('-')[1] + const size = parseInt(sizeStr) + if (isNaN(size)) { + throw new Error('Chunker parameter size must be an integer') + } + return { + chunker: 'fixed', + chunkerOptions: { + maxChunkSize: size + } + } + } else if (chunker.startsWith('rabin')) { + return { + chunker: 'rabin', + chunkerOptions: parseRabinString(chunker) + } + } else { + throw new Error(`Unrecognized chunker option: ${chunker}`) + } +} + +/** + * Parses rabin chunker string + * + * @param {String} chunker Chunker algorithm supported formats: + * "rabin" + * "rabin-{avg}" + * "rabin-{min}-{avg}-{max}" + * + * @return {Object} rabin chunker options + */ +const parseRabinString = (chunker) => { + const options = {} + const parts = chunker.split('-') + switch (parts.length) { + case 1: + options.avgChunkSize = 262144 + break + case 2: + options.avgChunkSize = parseChunkSize(parts[1], 'avg') + break + case 4: + options.minChunkSize = parseChunkSize(parts[1], 'min') + options.avgChunkSize = parseChunkSize(parts[2], 'avg') + options.maxChunkSize = parseChunkSize(parts[3], 'max') + break + default: + throw new Error('Incorrect chunker format (expected "rabin" "rabin-[avg]" or "rabin-[min]-[avg]-[max]"') + } + + return options +} + +const parseChunkSize = (str, name) => { + const size = parseInt(str) + if (isNaN(size)) { + throw new Error(`Chunker parameter ${name} must be an integer`) + } + + return size +} + +module.exports = { + parseChunkSize, + parseRabinString, + parseChunkerString +} diff --git a/src/core/components-ipfsx/index.js b/src/core/components-ipfsx/index.js new file mode 100644 index 0000000000..f9a2ee0109 --- /dev/null +++ b/src/core/components-ipfsx/index.js @@ -0,0 +1,15 @@ +'use strict' + +module.exports = { + add: require('./add'), + init: require('./init'), + start: require('./start'), + stop: require('./stop'), + legacy: { + config: require('../components/config'), + dag: require('../components/dag'), + libp2p: require('../components/libp2p'), + object: require('../components/object'), + pin: require('../components/pin') + } +} diff --git a/src/core/components-ipfsx/init.js b/src/core/components-ipfsx/init.js new file mode 100644 index 0000000000..5e5acda3b5 --- /dev/null +++ b/src/core/components-ipfsx/init.js @@ -0,0 +1,302 @@ +'use strict' + +const log = require('debug')('ipfs:components:init') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const mergeOptions = require('merge-options') +const promisify = require('promisify-es6') +const getDefaultConfig = require('../runtime/config-nodejs.js') +const createRepo = require('../runtime/repo-nodejs') +const Keychain = require('libp2p-keychain') +const NoKeychain = require('../components/no-keychain') +const GCLock = require('../components/pin/gc-lock') +const { DAGNode } = require('ipld-dag-pb') +const UnixFs = require('ipfs-unixfs') +const multicodec = require('multicodec') +const multiaddr = require('multiaddr') +const { + ERR_ALREADY_INITIALIZING, + ERR_ALREADY_INITIALIZED, + ERR_NOT_STARTED +} = require('../../errors') +const BlockService = require('ipfs-block-service') +const Ipld = require('ipld') +const getDefaultIpldOptions = require('../runtime/ipld-nodejs') +const createPreloader = require('./preload') +const { ERR_REPO_NOT_INITIALIZED } = require('ipfs-repo').errors +const IPNS = require('../ipns') +const OfflineDatastore = require('../ipns/routing/offline-datastore') +const initAssets = require('../runtime/init-assets-nodejs') +const Components = require('.') +const PinManager = require('../components/pin/pin-manager') + +module.exports = ({ + apiManager, + print, + constructorOptions +}) => async function init (options) { + const { cancel } = apiManager.update({ init: ERR_ALREADY_INITIALIZING }) + + try { + options = mergeOptions({}, options, constructorOptions.init) + + if (constructorOptions.pass) { + options.pass = constructorOptions.pass + } + + if (constructorOptions.config) { + options.config = constructorOptions.config + } + + const repo = typeof options.repo === 'string' || options.repo == null + ? createRepo(options.repo) + : options.repo + + let isInitialized = true + + if (repo.closed) { + try { + await repo.open() + } catch (err) { + if (err.code === ERR_REPO_NOT_INITIALIZED) { + isInitialized = false + } else { + throw err + } + } + } + + const { peerId, config, keychain } = isInitialized + ? await initExistingRepo(repo, options) + : await initNewRepo(repo, options) + + log('peer created') + const peerInfo = new PeerInfo(peerId) + + if (config.Addresses && config.Addresses.Swarm) { + config.Addresses.Swarm.forEach(addr => { + let ma = multiaddr(addr) + + if (ma.getPeerId()) { + ma = ma.encapsulate(`/p2p/${peerInfo.id.toB58String()}`) + } + + peerInfo.multiaddrs.add(ma) + }) + } + + const blockService = new BlockService(repo) + const ipld = new Ipld(getDefaultIpldOptions(blockService, constructorOptions.ipld, log)) + + const preload = createPreloader(constructorOptions.preload) + await preload.start() + + const gcLock = new GCLock(constructorOptions.repoOwner, { + // Make sure GCLock is specific to repo, for tests where there are + // multiple instances of IPFS + morticeId: repo.path + }) + + const dag = Components.legacy.dag({ _ipld: ipld, _preload: preload }) + const object = Components.legacy.object({ _ipld: ipld, _preload: preload, dag, _gcLock: gcLock }) + + const pinManager = new PinManager(repo, dag) + await pinManager.load() + + const pin = Components.legacy.pin({ _ipld: ipld, _preload: preload, object, _repo: repo, _pinManager: pinManager }) + const add = Components.add({ ipld, dag, preload, pin, gcLock, constructorOptions }) + + if (!isInitialized && !options.emptyRepo) { + // add empty unixfs dir object (go-ipfs assumes this exists) + const emptyDirCid = await addEmptyDir({ dag }) + + log('adding default assets') + await initAssets({ add, print }) + + log('initializing IPNS keyspace') + // Setup the offline routing for IPNS. + // This is primarily used for offline ipns modifications, such as the initializeKeyspace feature. + const offlineDatastore = new OfflineDatastore(repo) + const ipns = new IPNS(offlineDatastore, repo.datastore, peerInfo, keychain, { pass: options.pass }) + await ipns.initializeKeyspace(peerId.privKey.bytes, emptyDirCid.toString()) + } + + const api = createApi({ + add, + apiManager, + constructorOptions, + blockService, + gcLock, + initOptions: options, + ipld, + keychain, + peerInfo, + pinManager, + preload, + print, + repo + }) + + apiManager.update(api, ERR_NOT_STARTED) + } catch (err) { + cancel() + throw err + } + + return apiManager.api +} + +async function initNewRepo (repo, { privateKey, emptyRepo, bits, profiles, config, pass, print }) { + emptyRepo = emptyRepo || false + bits = bits == null ? 2048 : Number(bits) + + config = mergeOptions(getDefaultConfig(), config) + config = applyProfiles(profiles, config) + + // Verify repo does not exist yet + const exists = await repo.exists() + log('repo exists?', exists) + + if (exists === true) { + throw new Error('repo already exists') + } + + const peerId = await createPeerId({ privateKey, bits, print }) + let keychain = new NoKeychain() + + log('identity generated') + + config.Identity = { + PeerID: peerId.toB58String(), + PrivKey: peerId.privKey.bytes.toString('base64') + } + + privateKey = peerId.privKey + + config.Keychain = Keychain.generateOptions() + + log('peer identity: %s', config.Identity.PeerID) + + await repo.init(config) + await repo.open() + + log('repo opened') + + if (pass) { + log('creating keychain') + const keychainOptions = { passPhrase: pass, ...config.Keychain } + keychain = new Keychain(repo.keys, keychainOptions) + await keychain.importPeer('self', { privKey: privateKey }) + } + + return { peerId, keychain, config } +} + +async function initExistingRepo (repo, { config: newConfig, profiles, pass }) { + let config = await repo.config.get() + + if (newConfig || profiles) { + if (newConfig) { + config = mergeOptions(config, newConfig) + } + if (profiles) { + config = applyProfiles(profiles, config) + } + await repo.config.set(config) + } + + let keychain = new NoKeychain() + + if (pass) { + const keychainOptions = { passPhrase: pass, ...config.Keychain } + keychain = new Keychain(repo.keys, keychainOptions) + log('keychain constructed') + } + + const peerId = await promisify(PeerId.createFromPrivKey)(config.Identity.PrivKey) + + // Import the private key as 'self', if needed. + if (pass) { + try { + await keychain.findKeyByName('self') + } catch (err) { + log('Creating "self" key') + await keychain.importPeer('self', peerId) + } + } + + return { peerId, keychain, config } +} + +function createPeerId ({ privateKey, bits, print }) { + if (privateKey) { + log('using user-supplied private-key') + return typeof privateKey === 'object' + ? privateKey + : promisify(PeerId.createFromPrivKey)(Buffer.from(privateKey, 'base64')) + } else { + // Generate peer identity keypair + transform to desired format + add to config. + print('generating %s-bit RSA keypair...', bits) + return promisify(PeerId.create)({ bits }) + } +} + +async function addEmptyDir ({ dag }) { + const node = new DAGNode(new UnixFs('directory').marshal()) + return dag.put(node, { + version: 0, + format: multicodec.DAG_PB, + hashAlg: multicodec.SHA2_256 + }) +} + +// Apply profiles (e.g. ['server', 'lowpower']) to config +function applyProfiles (profiles, config) { + return (profiles || []).reduce((name, config) => { + const profile = Components.legacy.config.profiles[name] + if (!profile) { + throw new Error(`No profile with name '${name}'`) + } + log('applying profile %s', name) + return profile.transform(config) + }) +} + +function createApi ({ + add, + apiManager, + constructorOptions, + blockService, + gcLock, + initOptions, + ipld, + keychain, + peerInfo, + pinManager, + preload, + print, + repo +}) { + const start = Components.start({ + apiManager, + constructorOptions, + blockService, + gcLock, + initOptions, + ipld, + keychain, + peerInfo, + pinManager, + preload, + print, + repo + }) + + const api = { + add, + init: ERR_ALREADY_INITIALIZED, + start + } + + return api +} diff --git a/src/core/components-ipfsx/start.js b/src/core/components-ipfsx/start.js new file mode 100644 index 0000000000..53a3714b64 --- /dev/null +++ b/src/core/components-ipfsx/start.js @@ -0,0 +1,131 @@ +'use strict' + +const Bitswap = require('ipfs-bitswap') +const PeerBook = require('peer-book') +const IPNS = require('../ipns') +const routingConfig = require('../ipns/routing/config') +const defer = require('p-defer') +const { ERR_ALREADY_INITIALIZED } = require('../../errors') + +const Components = require('.') + +module.exports = ({ + apiManager, + constructorOptions, + blockService, + gcLock, + initOptions, + ipld, + keychain, + peerInfo, + pinManager, + preload, + print, + repo +}) => async function start () { + const startPromise = defer() + const { cancel } = apiManager.update({ start: () => startPromise.promise }) + + try { + // The repo may be closed if previously stopped + if (repo.closed) { + await repo.open() + } + + const config = await repo.config.get() + + const peerBook = new PeerBook() + const libp2p = Components.legacy.libp2p({ + _options: constructorOptions, + _repo: repo, + _peerInfo: peerInfo, + _peerInfoBook: peerBook + }, config) + + await libp2p.start() + + const ipnsRouting = routingConfig({ + _options: constructorOptions, + libp2p, + _repo: repo, + _peerInfo: peerInfo + }) + const ipns = new IPNS(ipnsRouting, repo.datastore, peerInfo, keychain, { pass: initOptions.pass }) + const bitswap = new Bitswap(libp2p, repo.blocks, { statsEnabled: true }) + + await bitswap.start() + + blockService.setExchange(bitswap) + + await preload.start() + await ipns.republisher.start() + // TODO: start mfs preload here + + const api = createApi({ + apiManager, + constructorOptions, + blockService, + gcLock, + initOptions, + ipld, + keychain, + peerInfo, + pinManager, + preload, + print, + repo + }) + + apiManager.update(api, () => undefined) + } catch (err) { + cancel() + startPromise.reject(err) + throw err + } + + startPromise.resolve(apiManager.api) + return apiManager.api +} + +function createApi ({ + apiManager, + constructorOptions, + blockService, + gcLock, + initOptions, + ipld, + keychain, + peerInfo, + pinManager, + preload, + print, + repo +}) { + const dag = Components.legacy.dag({ _ipld: ipld, _preload: preload }) + const object = Components.legacy.object({ _ipld: ipld, _preload: preload, dag, _gcLock: gcLock }) + const pin = Components.legacy.pin({ _ipld: ipld, _preload: preload, object, _repo: repo, _pinManager: pinManager }) + const add = Components.add({ ipld, dag, preload, pin, gcLock, constructorOptions }) + + const stop = Components.stop({ + apiManager, + constructorOptions, + blockService, + gcLock, + initOptions, + ipld, + keychain, + peerInfo, + preload, + print, + repo + }) + + const api = { + add, + init: ERR_ALREADY_INITIALIZED, + start: () => apiManager.api, + stop + } + + return api +} diff --git a/src/core/components-ipfsx/stop.js b/src/core/components-ipfsx/stop.js new file mode 100644 index 0000000000..cc6455f24d --- /dev/null +++ b/src/core/components-ipfsx/stop.js @@ -0,0 +1,107 @@ +'use strict' + +const defer = require('p-defer') +const Components = require('.') +const { ERR_NOT_STARTED, ERR_ALREADY_INITIALIZED } = require('../../errors') + +module.exports = ({ + apiManager, + constructorOptions, + bitswap, + blockService, + gcLock, + initOptions, + ipld, + ipns, + keychain, + libp2p, + peerInfo, + pinManager, + preload, + print, + repo +}) => async function stop () { + const stopPromise = defer() + const { cancel } = apiManager.update({ stop: () => stopPromise.promise }) + + try { + blockService.unsetExchange() + bitswap.stop() + preload.stop() + + await Promise.all([ + ipns.republisher.stop(), + // mfsPreload.stop(), + libp2p.stop(), + repo.close() + ]) + + const api = createApi({ + apiManager, + constructorOptions, + blockService, + gcLock, + initOptions, + ipld, + keychain, + peerInfo, + pinManager, + preload, + print, + repo + }) + + apiManager.update(api, ERR_NOT_STARTED) + } catch (err) { + cancel() + stopPromise.reject(err) + throw err + } + + stopPromise.resolve(apiManager.api) + return apiManager.api +} + +function createApi ({ + apiManager, + constructorOptions, + blockService, + gcLock, + initOptions, + ipld, + keychain, + peerInfo, + pinManager, + preload, + print, + repo +}) { + const dag = Components.legacy.dag({ _ipld: ipld, _preload: preload }) + const object = Components.legacy.object({ _ipld: ipld, _preload: preload, dag, _gcLock: gcLock }) + const pin = Components.legacy.pin({ _ipld: ipld, _preload: preload, object, _repo: repo, _pinManager: pinManager }) + const add = Components.add({ ipld, dag, preload, pin, gcLock, constructorOptions }) + + const start = Components.start({ + apiManager, + constructorOptions, + blockService, + gcLock, + initOptions, + ipld, + keychain, + peerInfo, + pinManager, + preload, + print, + repo + }) + + const api = { + add, + init: ERR_ALREADY_INITIALIZED, + start, + stop: () => apiManager.api + } + + return api +} diff --git a/src/core/components/files-regular/add-async-iterator.js b/src/core/components/files-regular/add-async-iterator.js index e138a1cd66..f69d7268f6 100644 --- a/src/core/components/files-regular/add-async-iterator.js +++ b/src/core/components/files-regular/add-async-iterator.js @@ -1,155 +1,24 @@ 'use strict' -const importer = require('ipfs-unixfs-importer') -const normaliseAddInput = require('ipfs-utils/src/files/normalise-input') -const { parseChunkerString } = require('./utils') -const pipe = require('it-pipe') -const log = require('debug')('ipfs:add') -log.error = require('debug')('ipfs:add:error') - -function noop () {} +const createAdd = require('../../components-ipfsx/add') module.exports = function (self) { - // Internal add func that gets used by all add funcs - return async function * addAsyncIterator (source, options) { - options = options || {} - - const chunkerOptions = parseChunkerString(options.chunker) - - const opts = Object.assign({}, { - shardSplitThreshold: self._options.EXPERIMENTAL.sharding - ? 1000 - : Infinity - }, options, { - strategy: 'balanced', - chunker: chunkerOptions.chunker, - chunkerOptions: chunkerOptions.chunkerOptions - }) - - // CID v0 is for multihashes encoded with sha2-256 - if (opts.hashAlg && opts.cidVersion !== 1) { - opts.cidVersion = 1 - } - - if (opts.trickle) { - opts.strategy = 'trickle' - } - - delete opts.trickle - - let total = 0 - - const prog = opts.progress || noop - const progress = (bytes) => { - total += bytes - prog(total) - } - - opts.progress = progress - - const iterator = pipe( - normaliseAddInput(source), - doImport(self, opts), - transformFile(self, opts), - preloadFile(self, opts), - pinFile(self, opts) - ) - - const releaseLock = await self._gcLock.readLock() - - try { - yield * iterator - } finally { - releaseLock() - } - } -} - -function doImport (ipfs, opts) { - return async function * (source) { // eslint-disable-line require-await - yield * importer(source, ipfs._ipld, opts) - } -} - -function transformFile (ipfs, opts) { - return async function * (source) { - for await (const file of source) { - let cid = file.cid - const hash = cid.toBaseEncodedString() - let path = file.path ? file.path : hash - - if (opts.wrapWithDirectory && !file.path) { - path = '' - } - - if (opts.onlyHash) { - yield { - path, - hash, - size: file.unixfs.fileSize() - } - - return - } - - const node = await ipfs.object.get(file.cid, Object.assign({}, opts, { preload: false })) - - if (opts.cidVersion === 1) { - cid = cid.toV1() - } - - let size = node.size - - if (Buffer.isBuffer(node)) { - size = node.length - } - - yield { - path, - hash, - size - } - } - } -} - -function preloadFile (ipfs, opts) { - return async function * (source) { - for await (const file of source) { - const isRootFile = !file.path || opts.wrapWithDirectory - ? file.path === '' - : !file.path.includes('/') + const { + _ipld: ipld, + dag, + _gcLock: gcLock, + _preload: preload, + pin, + _options: config + } = self - const shouldPreload = isRootFile && !opts.onlyHash && opts.preload !== false + const add = createAdd({ ipld, dag, gcLock, preload, pin, config }) - if (shouldPreload) { - ipfs._preload(file.hash) - } - - yield file - } - } -} - -function pinFile (ipfs, opts) { - return async function * (source) { - for await (const file of source) { - // Pin a file if it is the root dir of a recursive add or the single file - // of a direct add. - const pin = 'pin' in opts ? opts.pin : true - const isRootDir = !file.path.includes('/') - const shouldPin = pin && isRootDir && !opts.onlyHash && !opts.hashAlg - - if (shouldPin) { - // Note: addAsyncIterator() has already taken a GC lock, so tell - // pin.add() not to take a (second) GC lock - await ipfs.pin.add(file.hash, { - preload: false, - lock: false - }) - } + return async function * addAsyncIterator (source, options) { + options = options || {} - yield file + for await (const file of add(source, options)) { + yield { hash: file.cid.toString(), ...file } } } } diff --git a/src/core/components/files-regular/utils.js b/src/core/components/files-regular/utils.js index 876d0b0d48..0a5c8a57ba 100644 --- a/src/core/components/files-regular/utils.js +++ b/src/core/components/files-regular/utils.js @@ -20,86 +20,6 @@ const normalizePath = (path) => { return path } -/** - * Parses chunker string into options used by DAGBuilder in ipfs-unixfs-engine - * - * - * @param {String} chunker Chunker algorithm supported formats: - * "size-{size}" - * "rabin" - * "rabin-{avg}" - * "rabin-{min}-{avg}-{max}" - * - * @return {Object} Chunker options for DAGBuilder - */ -const parseChunkerString = (chunker) => { - if (!chunker) { - return { - chunker: 'fixed' - } - } else if (chunker.startsWith('size-')) { - const sizeStr = chunker.split('-')[1] - const size = parseInt(sizeStr) - if (isNaN(size)) { - throw new Error('Chunker parameter size must be an integer') - } - return { - chunker: 'fixed', - chunkerOptions: { - maxChunkSize: size - } - } - } else if (chunker.startsWith('rabin')) { - return { - chunker: 'rabin', - chunkerOptions: parseRabinString(chunker) - } - } else { - throw new Error(`Unrecognized chunker option: ${chunker}`) - } -} - -/** - * Parses rabin chunker string - * - * @param {String} chunker Chunker algorithm supported formats: - * "rabin" - * "rabin-{avg}" - * "rabin-{min}-{avg}-{max}" - * - * @return {Object} rabin chunker options - */ -const parseRabinString = (chunker) => { - const options = {} - const parts = chunker.split('-') - switch (parts.length) { - case 1: - options.avgChunkSize = 262144 - break - case 2: - options.avgChunkSize = parseChunkSize(parts[1], 'avg') - break - case 4: - options.minChunkSize = parseChunkSize(parts[1], 'min') - options.avgChunkSize = parseChunkSize(parts[2], 'avg') - options.maxChunkSize = parseChunkSize(parts[3], 'max') - break - default: - throw new Error('Incorrect chunker format (expected "rabin" "rabin-[avg]" or "rabin-[min]-[avg]-[max]"') - } - - return options -} - -const parseChunkSize = (str, name) => { - const size = parseInt(str) - if (isNaN(size)) { - throw new Error(`Chunker parameter ${name} must be an integer`) - } - - return size -} - const mapFile = (file, options) => { options = options || {} @@ -129,8 +49,5 @@ const mapFile = (file, options) => { module.exports = { normalizePath, - parseChunkSize, - parseRabinString, - parseChunkerString, mapFile } diff --git a/src/core/components/pin.js b/src/core/components/pin.js index cbe0c8a250..176e5f5cc8 100644 --- a/src/core/components/pin.js +++ b/src/core/components/pin.js @@ -10,7 +10,7 @@ const PinTypes = PinManager.PinTypes module.exports = (self) => { const dag = self.dag - const pinManager = new PinManager(self._repo, dag) + const pinManager = self._pinManager || new PinManager(self._repo, dag) const pin = { add: callbackify.variadic(async (paths, options) => { diff --git a/src/core/errors.js b/src/core/errors.js new file mode 100644 index 0000000000..b938eff2d6 --- /dev/null +++ b/src/core/errors.js @@ -0,0 +1,19 @@ +const errCode = require('err-code') + +exports.ERR_NOT_INITIALIZED = () => { + throw errCode(new Error('not initialized'), 'ERR_NOT_INITIALIZED') +} + +exports.ERR_ALREADY_INITIALIZING = () => { + const msg = 'cannot initialize an initializing node' + throw errCode(new Error(msg), 'ERR_ALREADY_INITIALIZING') +} + +exports.ERR_ALREADY_INITIALIZED = () => { + const msg = 'cannot re-initialize an initialized node' + throw errCode(new Error(msg), 'ERR_ALREADY_INITIALIZED') +} + +exports.ERR_NOT_STARTED = () => { + throw errCode(new Error('not started'), 'ERR_NOT_STARTED') +} diff --git a/src/core/index.js b/src/core/index.js index a5ad33edf9..17a3ff99de 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -28,14 +28,13 @@ const preload = require('./preload') const mfsPreload = require('./mfs-preload') const ipldOptions = require('./runtime/ipld-nodejs') const { isTest } = require('ipfs-utils/src/env') +const ipfsx = require('./ipfsx') /** * @typedef { import("./ipns/index") } IPNS */ /** - * - * * @class IPFS * @extends {EventEmitter} */ @@ -86,7 +85,7 @@ class IPFS extends EventEmitter { this._bitswap = undefined this._blockService = new BlockService(this._repo) this._ipld = new Ipld(ipldOptions(this._blockService, this._options.ipld, this.log)) - this._preload = preload(this) + this._preload = preload(this._options.preload) this._mfsPreload = mfsPreload(this) /** @type {IPNS} */ this._ipns = undefined @@ -177,5 +176,8 @@ module.exports.createNode = (options) => { } module.exports.create = (options) => { + if (options && options.EXPERIMENTAL && options.EXPERIMENTAL.ipfsx) { + return ipfsx(options) + } return new IPFS(options).ready } diff --git a/src/core/ipfsx.js b/src/core/ipfsx.js new file mode 100644 index 0000000000..25d7352457 --- /dev/null +++ b/src/core/ipfsx.js @@ -0,0 +1,45 @@ +'use strict' + +const log = require('debug')('ipfs') +const mergeOptions = require('merge-options') +const { isTest } = require('ipfs-utils/src/env') +const { ERR_NOT_INITIALIZED } = require('./errors') +const { validate } = require('./config') +const Components = require('./components-ipfsx') +const ApiManager = require('./api-manager') + +const getDefaultOptions = () => ({ + init: true, + start: true, + EXPERIMENTAL: {}, + preload: { + enabled: !isTest, // preload by default, unless in test env + addresses: [ + '/dns4/node0.preload.ipfs.io/https', + '/dns4/node1.preload.ipfs.io/https' + ] + } +}) + +module.exports = async options => { + options = mergeOptions(getDefaultOptions(), validate(options || {})) + + // eslint-disable-next-line no-console + const print = options.silent ? log : console.log + + const apiManager = new ApiManager() + const init = Components.init({ apiManager, print, constructorOptions: options }) + const { api } = apiManager.update({ init }, ERR_NOT_INITIALIZED) + + if (!options.init) { + return api + } + + await api.init() + + if (!options.start) { + return api + } + + return api.start() +} diff --git a/src/core/preload.js b/src/core/preload.js index 5427a2ecd0..053103c648 100644 --- a/src/core/preload.js +++ b/src/core/preload.js @@ -10,8 +10,8 @@ const preload = require('./runtime/preload-nodejs') const log = debug('ipfs:preload') log.error = debug('ipfs:preload:error') -module.exports = self => { - const options = self._options.preload || {} +module.exports = options => { + options = options || {} options.enabled = Boolean(options.enabled) options.addresses = options.addresses || [] diff --git a/src/core/runtime/init-assets-browser.js b/src/core/runtime/init-assets-browser.js new file mode 100644 index 0000000000..0c0c42d5b5 --- /dev/null +++ b/src/core/runtime/init-assets-browser.js @@ -0,0 +1 @@ +module.exports = () => {} diff --git a/src/core/runtime/init-assets-nodejs.js b/src/core/runtime/init-assets-nodejs.js new file mode 100644 index 0000000000..d8c4665f42 --- /dev/null +++ b/src/core/runtime/init-assets-nodejs.js @@ -0,0 +1,15 @@ +'use strict' + +const path = require('path') +const globSource = require('ipfs-utils/src/files/glob-source') +const all = require('async-iterator-all') + +// Add the default assets to the repo. +module.exports = async function initAssets ({ add, print }) { + const initDocsPath = path.join(__dirname, '..', '..', 'init-files', 'init-docs') + const results = await all(add(globSource(initDocsPath))) + const dir = results.filter(file => file.path === 'init-docs').pop() + + print('to get started, enter:\n') + print(`\tjsipfs cat /ipfs/${dir.cid}/readme\n`) +} diff --git a/src/core/utils.js b/src/core/utils.js index 8373797dde..2b852410da 100644 --- a/src/core/utils.js +++ b/src/core/utils.js @@ -127,3 +127,25 @@ const resolvePath = async function (objectAPI, ipfsPaths) { exports.normalizePath = normalizePath exports.parseIpfsPath = parseIpfsPath exports.resolvePath = resolvePath + +exports.withFirstAndLast = fn => { + return (...args) => { + const it = fn(...args) + return { + [Symbol.asyncIterator] () { + return it[Symbol.asyncIterator]() + }, + async first () { + const { value } = await it.next() + return value + }, + async last () { + let last + for await (const value of it) { + last = value + } + return last + } + } + } +} diff --git a/test/core/interface.spec.js b/test/core/interface.spec.js index bfc0cb6508..0b882fdf01 100644 --- a/test/core/interface.spec.js +++ b/test/core/interface.spec.js @@ -51,6 +51,13 @@ describe('interface-ipfs-core tests', function () { }] }) + tests.filesRegular.ipfsx(CommonFactory.create({ + spawnOptions: { + initOptions: { bits: 512, profile: 'test' }, + EXPERIMENTAL: { ipfsx: true } + } + })) + tests.filesMFS(defaultCommonFactory) tests.key(CommonFactory.createAsync({