diff --git a/README.md b/README.md index ae8b3aed..78af7621 100644 --- a/README.md +++ b/README.md @@ -167,7 +167,7 @@ const Exporter = require('ipfs-unixfs-engine').Exporter ### new Exporter(, ) -Uses the given [dag API or an ipld-resolver instance][] to fetch an IPFS [UnixFS][] object(s) by their multiaddress. +Uses the given [dag API] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object(s) by their multiaddress. Creates a new readable stream in object mode that outputs objects of the form @@ -181,9 +181,101 @@ Creates a new readable stream in object mode that outputs objects of the form Errors are received as with a normal stream, by listening on the `'error'` event to be emitted. -[IPLD Resolver]: https://github.com/ipld/js-ipld-resolver +[dag API]: https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/DAG.md +[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver [UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs +## Reader + +The `reader` allows you to receive part or all of a file as a [pull-stream]. + +#### Reader example + +```js +const readable = require('ipfs-unixfs-engine').readable +const pull = require('pull-stream') +const drain = require('pull-stream/sinks/collect') + +pull( + readable(cid, ipldResolver) + collect((error, chunks) => { + // do something with the file chunks and/or handle errors + }) +) +``` + +#### Reader API + +```js +const reader = require('ipfs-unixfs-engine').reader +``` + +### reader(, , , ) + +Uses the given [dag API][] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object by their multiaddress. + +Creates a new [pull-stream][] that sends the requested chunks of data as a series of [Buffer][] objects. + +```js +const readable = require('ipfs-unixfs-engine').readable +const pull = require('pull-stream') +const drain = require('pull-stream/sinks/drain') + +pull( + readable(cid, ipldResolver), + drain((chunk) => { + // do something with the file chunk + }) +) +``` + +#### `begin` and `end` + +`begin` and `end` arguments can optionally be passed to the reader function. These follow the same semantics as the JavaScript [`Array.slice(begin, end)`][] method. + +That is: `begin` is the index in the stream to start sending data, `end` is the index *before* which to stop sending data. + +A negative `begin` starts the slice from the end of the stream and a negative `end` ends the slice by subtracting `end` from the total stream length. + +See [the tests](test/reader.js) for examples of using these arguments. + +```js +const readable = require('ipfs-unixfs-engine').readable +const pull = require('pull-stream') +const drain = require('pull-stream/sinks/drain') + +pull( + readable(cid, ipldResolver, 0, 10) + drain((chunk) => { + // chunk is a Buffer containing only the first 10 bytes of the stream + }) +) +``` + +#### Errors + +Errors are received by [pull-stream][] sinks. + +```js +const readable = require('ipfs-unixfs-engine').readable +const pull = require('pull-stream') +const drain = require('pull-stream/sinks/collect') + +pull( + readable(cid, ipldResolver, 0, 10) + collect((error, chunks) => { + // handle the error + }) +) +``` + +[pull-stream]: https://www.npmjs.com/package/pull-stream +[Buffer]: https://www.npmjs.com/package/buffer +[dag API]: https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/DAG.md +[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver +[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs +[`Array.slice(begin, end)`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/slice + ## Contribute Feel free to join in. All welcome. Open an [issue](https://github.com/ipfs/js-ipfs-unixfs-engine/issues)! diff --git a/package.json b/package.json index daf10e55..29e2139a 100644 --- a/package.json +++ b/package.json @@ -66,6 +66,7 @@ "lodash": "^4.17.5", "multihashes": "~0.4.13", "multihashing-async": "~0.4.8", + "pull-async-values": "^1.0.3", "pull-batch": "^1.0.0", "pull-block": "^1.4.0", "pull-cat": "^1.1.11", diff --git a/src/index.js b/src/index.js index 9ca42824..740aae8b 100644 --- a/src/index.js +++ b/src/index.js @@ -2,3 +2,4 @@ exports.importer = exports.Importer = require('./importer') exports.exporter = exports.Exporter = require('./exporter') +exports.reader = exports.Reader = require('./reader') diff --git a/src/reader/index.js b/src/reader/index.js new file mode 100644 index 00000000..c9f7ddf8 --- /dev/null +++ b/src/reader/index.js @@ -0,0 +1,134 @@ +'use strict' + +const CID = require('cids') +const pull = require('pull-stream') +const asyncValues = require('pull-async-values') +const asyncMap = require('pull-stream/throughs/async-map') +const map = require('pull-stream/throughs/map') +const UnixFS = require('ipfs-unixfs') +const toB58String = require('multihashes').toB58String +const waterfall = require('async/waterfall') + +module.exports = (path, ipldResolver, begin = 0, end) => { + let streamPosition = 0 + + return pull( + asyncValues((cb) => { + waterfall([ + (next) => toCid(path, next), + (cid, next) => ipldResolver.get(cid, next), + (node, next) => { + const meta = UnixFS.unmarshal(node.value.data) + + if (meta.type !== 'file') { + return next(new Error(`Path ${path} was not a file (was ${meta.type}), can only read files`)) + } + + const fileSize = meta.fileSize() + + if (!end || end > fileSize) { + end = fileSize + } + + if (begin < 0) { + begin = fileSize + begin + } + + if (end < 0) { + end = fileSize + end + } + + const links = node.value.links + + if (!links || !links.length) { + if (meta.data && meta.data.length) { + // file was small enough to fit in one DAGNode so has no links + return next(null, [(done) => done(null, meta.data)]) + } + + return next(new Error(`Path ${path} had no links or data`)) + } + + const linkedDataSize = links.reduce((acc, curr) => acc + curr.size, 0) + const overhead = (linkedDataSize - meta.fileSize()) / links.length + + // create an array of functions to fetch link data + next(null, links.map((link) => (done) => { + // DAGNode Links report unixfs object data sizes $overhead bytes (typically 14) + // larger than they actually are due to the protobuf wrapper + const bytesInLinkedObjectData = link.size - overhead + + if (begin > (streamPosition + bytesInLinkedObjectData)) { + // Start byte is after this block so skip it + streamPosition += bytesInLinkedObjectData + + return done() + } + + if (end < streamPosition) { + // End byte was before this block so skip it + streamPosition += bytesInLinkedObjectData + + return done() + } + + // transform the multihash to a cid, the cid to a node and the node to some data + waterfall([ + (next) => toCid(link.multihash, next), + (cid, next) => ipldResolver.get(cid, next), + (node, next) => next(null, node.value.data), + (data, next) => next(null, UnixFS.unmarshal(data).data) + ], done) + })) + } + ], cb) + }), + asyncMap((loadLinkData, cb) => loadLinkData(cb)), + pull.filter(Boolean), + map((data) => { + const block = extractDataFromBlock(data, streamPosition, begin, end) + + streamPosition += data.length + + return block + }) + ) +} + +function toCid (input, callback) { + let path = input + + try { + if (Buffer.isBuffer(path)) { + path = toB58String(path) + } + + if (path.indexOf('/ipfs/') === 0) { + path = path.substring('/ipfs/'.length) + } + + if (path.charAt(path.length - 1) === '/') { + path = path.substring(0, path.length - 1) + } + + callback(null, new CID(path)) + } catch (error) { + callback(new Error(`Path '${input}' was invalid: ${error.message}`)) + } +} + +function extractDataFromBlock (block, streamPosition, begin, end) { + const blockLength = block.length + + if (end - streamPosition < blockLength) { + // If the end byte is in the current block, truncate the block to the end byte + block = block.slice(0, end - streamPosition) + } + + if (begin > streamPosition && begin < (streamPosition + blockLength)) { + // If the start byte is in the current block, skip to the start byte + block = block.slice(begin - streamPosition) + } + + return block +} diff --git a/test/browser.js b/test/browser.js index 6cf3280f..760d2a24 100644 --- a/test/browser.js +++ b/test/browser.js @@ -60,6 +60,9 @@ describe('IPFS data importing tests on the Browser', function () { // require('./exporter')(repo) // require('./exporter-subtree')(repo) + // Reader + require('./reader')(repo) + // Other require('./import-export')(repo) require('./import-export-nested-dir')(repo) diff --git a/test/node.js b/test/node.js index 8bc82510..315235c5 100644 --- a/test/node.js +++ b/test/node.js @@ -61,6 +61,9 @@ describe('IPFS UnixFS Engine', () => { require('./exporter')(repo) require('./exporter-subtree')(repo) + // Reader + require('./reader')(repo) + // Other require('./import-export')(repo) require('./import-export-nested-dir')(repo) diff --git a/test/reader.js b/test/reader.js new file mode 100644 index 00000000..54a7866e --- /dev/null +++ b/test/reader.js @@ -0,0 +1,241 @@ +/* eslint-env mocha */ +'use strict' + +const reader = require('../src').reader + +const chai = require('chai') +chai.use(require('dirty-chai')) +const expect = chai.expect +const BlockService = require('ipfs-block-service') +const Ipld = require('ipld') +const pull = require('pull-stream') +const values = require('pull-stream/sources/values') +const collect = require('pull-stream/sinks/collect') +const importer = require('./../src').importer + +module.exports = (repo) => { + describe('reader', function () { + let ipld + + function addAndReadTestFile ({file, begin, end, strategy = 'balanced'}, cb) { + pull( + values([{ + path: '/foo', + content: file + }]), + importer(ipld, { + strategy + }), + collect((error, nodes) => { + expect(error).to.not.exist() + expect(nodes.length).to.be.eql(1) + + pull( + reader(nodes[0].multihash, ipld, begin, end), + collect((error, results) => { + cb(error, Buffer.concat(results)) + }) + ) + }) + ) + } + + function checkBytesThatSpanBlocks (strategy, cb) { + const bytesInABlock = 262144 + const bytes = Buffer.alloc(bytesInABlock + 100, 0) + + bytes[bytesInABlock - 1] = 1 + bytes[bytesInABlock] = 2 + bytes[bytesInABlock + 1] = 3 + + addAndReadTestFile({ + file: bytes, + begin: bytesInABlock - 1, + end: bytesInABlock + 2, + strategy + }, (error, data) => { + if (error) { + return cb(error) + } + + expect(data).to.deep.equal(Buffer.from([1, 2, 3])) + + cb() + }) + } + + before(() => { + const bs = new BlockService(repo) + ipld = new Ipld(bs) + }) + + it('fails on invalid path', (done) => { + pull( + reader('?!?', ipld), + collect((error) => { + expect(error.message).to.contain("Path '?!?' was invalid: Non-base58 character") + + done() + }) + ) + }) + + it('fails on non-file', (done) => { + addAndReadTestFile({ + file: undefined + }, (error) => { + expect(error.message).to.contain('was not a file') + + done() + }) + }) + + it('fails on file with no links', (done) => { + addAndReadTestFile({ + file: Buffer.from([]) + }, (error) => { + expect(error.message).to.contain('had no links') + + done() + }) + }) + + it('reads bytes with a begin', (done) => { + addAndReadTestFile({ + file: Buffer.from([0, 1, 2, 3]), + begin: 1 + }, (error, data) => { + if (error) { + return done(error) + } + + expect(data).to.deep.equal(Buffer.from([1, 2, 3])) + + done() + }) + }) + + it('reads bytes with a negative begin', (done) => { + addAndReadTestFile({ + file: Buffer.from([0, 1, 2, 3]), + begin: -1 + }, (error, data) => { + if (error) { + return done(error) + } + + expect(data).to.deep.equal(Buffer.from([3])) + + done() + }) + }) + + it('reads bytes with an end', (done) => { + addAndReadTestFile({ + file: Buffer.from([0, 1, 2, 3]), + being: 0, + end: 1 + }, (error, data) => { + if (error) { + return done(error) + } + + expect(data).to.deep.equal(Buffer.from([0])) + + done() + }) + }) + + it('reads bytes with a negative end', (done) => { + addAndReadTestFile({ + file: Buffer.from([0, 1, 2, 3, 4]), + begin: 2, + end: -1 + }, (error, data) => { + if (error) { + return done(error) + } + + expect(data).to.deep.equal(Buffer.from([2, 3])) + + done() + }) + }) + + it('reads bytes with an begin and an end', (done) => { + addAndReadTestFile({ + file: Buffer.from([0, 1, 2, 3, 4]), + begin: 1, + end: 4 + }, (error, data) => { + if (error) { + return done(error) + } + + expect(data).to.deep.equal(Buffer.from([1, 2, 3])) + + done() + }) + }) + + it('reads bytes with a negative begin and a negative end that point to the same byte', (done) => { + addAndReadTestFile({ + file: Buffer.from([0, 1, 2, 3, 4]), + begin: -1, + end: -1 + }, (error, data) => { + if (error) { + return done(error) + } + + expect(data).to.deep.equal(Buffer.from([])) + + done() + }) + }) + + it('reads bytes with a negative begin and a negative end', (done) => { + addAndReadTestFile({ + file: Buffer.from([0, 1, 2, 3, 4]), + begin: -2, + end: -1 + }, (error, data) => { + if (error) { + return done(error) + } + + expect(data).to.deep.equal(Buffer.from([3])) + + done() + }) + }) + + it('reads bytes to the end of the file when end is larger than the file', (done) => { + addAndReadTestFile({ + file: Buffer.from([0, 1, 2, 3]), + begin: 0, + end: 100 + }, (error, data) => { + if (error) { + return done(error) + } + + expect(data).to.deep.equal(Buffer.from([0, 1, 2, 3])) + + done() + }) + }) + + it('reads bytes with an offset and a length that span blocks using balanced layout', (done) => { + checkBytesThatSpanBlocks('balanced', done) + }) + + it('reads bytes with an offset and a length that span blocks using flat layout', (done) => { + checkBytesThatSpanBlocks('flat', done) + }) + + it('reads bytes with an offset and a length that span blocks using trickle layout', (done) => { + checkBytesThatSpanBlocks('trickle', done) + }) + }) +}