Skip to content
This repository has been archived by the owner on Aug 12, 2020. It is now read-only.

Commit

Permalink
feat: Add reader to read files or part of files as streams
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Mar 11, 2018
1 parent bd7624d commit 66be6b3
Show file tree
Hide file tree
Showing 7 changed files with 477 additions and 2 deletions.
96 changes: 94 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ const Exporter = require('ipfs-unixfs-engine').Exporter

### new Exporter(<cid or ipfsPath>, <dag or ipld-resolver>)

Uses the given [dag API or an ipld-resolver instance][] to fetch an IPFS [UnixFS][] object(s) by their multiaddress.
Uses the given [dag API] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object(s) by their multiaddress.

Creates a new readable stream in object mode that outputs objects of the form

Expand All @@ -181,9 +181,101 @@ Creates a new readable stream in object mode that outputs objects of the form
Errors are received as with a normal stream, by listening on the `'error'` event to be emitted.


[IPLD Resolver]: https://github.com/ipld/js-ipld-resolver
[dag API]: https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/DAG.md
[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs

## Reader

The `reader` allows you to receive part or all of a file as a [pull-stream].

#### Reader example

```js
const readable = require('ipfs-unixfs-engine').readable
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/collect')

pull(
readable(cid, ipldResolver)
collect((error, chunks) => {
// do something with the file chunks and/or handle errors
})
)
```

#### Reader API

```js
const reader = require('ipfs-unixfs-engine').reader
```

### reader(<cid or ipfsPath>, <dag or ipld-resolver>, <begin>, <end>)

Uses the given [dag API][] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object by their multiaddress.

Creates a new [pull-stream][] that sends the requested chunks of data as a series of [Buffer][] objects.

```js
const readable = require('ipfs-unixfs-engine').readable
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/drain')

pull(
readable(cid, ipldResolver),
drain((chunk) => {
// do something with the file chunk
})
)
```

#### `begin` and `end`

`begin` and `end` arguments can optionally be passed to the reader function. These follow the same semantics as the JavaScript [`Array.slice(begin, end)`][] method.

That is: `begin` is the index in the stream to start sending data, `end` is the index *before* which to stop sending data.

A negative `begin` starts the slice from the end of the stream and a negative `end` ends the slice by subtracting `end` from the total stream length.

See [the tests](test/reader.js) for examples of using these arguments.

```js
const readable = require('ipfs-unixfs-engine').readable
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/drain')

pull(
readable(cid, ipldResolver, 0, 10)
drain((chunk) => {
// chunk is a Buffer containing only the first 10 bytes of the stream
})
)
```

#### Errors

Errors are received by [pull-stream][] sinks.

```js
const readable = require('ipfs-unixfs-engine').readable
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/collect')

pull(
readable(cid, ipldResolver, 0, 10)
collect((error, chunks) => {
// handle the error
})
)
```

[pull-stream]: https://www.npmjs.com/package/pull-stream
[Buffer]: https://www.npmjs.com/package/buffer
[dag API]: https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/DAG.md
[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs
[`Array.slice(begin, end)`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/slice

## Contribute

Feel free to join in. All welcome. Open an [issue](https://github.com/ipfs/js-ipfs-unixfs-engine/issues)!
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"lodash": "^4.17.5",
"multihashes": "~0.4.13",
"multihashing-async": "~0.4.8",
"pull-async-values": "^1.0.3",
"pull-batch": "^1.0.0",
"pull-block": "^1.4.0",
"pull-cat": "^1.1.11",
Expand Down
1 change: 1 addition & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@

exports.importer = exports.Importer = require('./importer')
exports.exporter = exports.Exporter = require('./exporter')
exports.reader = exports.Reader = require('./reader')
134 changes: 134 additions & 0 deletions src/reader/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
'use strict'

const CID = require('cids')
const pull = require('pull-stream')
const asyncValues = require('pull-async-values')
const asyncMap = require('pull-stream/throughs/async-map')
const map = require('pull-stream/throughs/map')
const UnixFS = require('ipfs-unixfs')
const toB58String = require('multihashes').toB58String
const waterfall = require('async/waterfall')

module.exports = (path, ipldResolver, begin = 0, end) => {
let streamPosition = 0

return pull(
asyncValues((cb) => {
waterfall([
(next) => toCid(path, next),
(cid, next) => ipldResolver.get(cid, next),
(node, next) => {
const meta = UnixFS.unmarshal(node.value.data)

if (meta.type !== 'file') {
return next(new Error(`Path ${path} was not a file (was ${meta.type}), can only read files`))
}

const fileSize = meta.fileSize()

if (!end || end > fileSize) {
end = fileSize
}

if (begin < 0) {
begin = fileSize + begin
}

if (end < 0) {
end = fileSize + end
}

const links = node.value.links

if (!links || !links.length) {
if (meta.data && meta.data.length) {
// file was small enough to fit in one DAGNode so has no links
return next(null, [(done) => done(null, meta.data)])
}

return next(new Error(`Path ${path} had no links or data`))
}

const linkedDataSize = links.reduce((acc, curr) => acc + curr.size, 0)
const overhead = (linkedDataSize - meta.fileSize()) / links.length

// create an array of functions to fetch link data
next(null, links.map((link) => (done) => {
// DAGNode Links report unixfs object data sizes $overhead bytes (typically 14)
// larger than they actually are due to the protobuf wrapper
const bytesInLinkedObjectData = link.size - overhead

if (begin > (streamPosition + bytesInLinkedObjectData)) {
// Start byte is after this block so skip it
streamPosition += bytesInLinkedObjectData

return done()
}

if (end < streamPosition) {
// End byte was before this block so skip it
streamPosition += bytesInLinkedObjectData

return done()
}

// transform the multihash to a cid, the cid to a node and the node to some data
waterfall([
(next) => toCid(link.multihash, next),
(cid, next) => ipldResolver.get(cid, next),
(node, next) => next(null, node.value.data),
(data, next) => next(null, UnixFS.unmarshal(data).data)
], done)
}))
}
], cb)
}),
asyncMap((loadLinkData, cb) => loadLinkData(cb)),
pull.filter(Boolean),
map((data) => {
const block = extractDataFromBlock(data, streamPosition, begin, end)

streamPosition += data.length

return block
})
)
}

function toCid (input, callback) {
let path = input

try {
if (Buffer.isBuffer(path)) {
path = toB58String(path)
}

if (path.indexOf('/ipfs/') === 0) {
path = path.substring('/ipfs/'.length)
}

if (path.charAt(path.length - 1) === '/') {
path = path.substring(0, path.length - 1)
}

callback(null, new CID(path))
} catch (error) {
callback(new Error(`Path '${input}' was invalid: ${error.message}`))
}
}

function extractDataFromBlock (block, streamPosition, begin, end) {
const blockLength = block.length

if (end - streamPosition < blockLength) {
// If the end byte is in the current block, truncate the block to the end byte
block = block.slice(0, end - streamPosition)
}

if (begin > streamPosition && begin < (streamPosition + blockLength)) {
// If the start byte is in the current block, skip to the start byte
block = block.slice(begin - streamPosition)
}

return block
}
3 changes: 3 additions & 0 deletions test/browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ describe('IPFS data importing tests on the Browser', function () {
// require('./exporter')(repo)
// require('./exporter-subtree')(repo)

// Reader
require('./reader')(repo)

// Other
require('./import-export')(repo)
require('./import-export-nested-dir')(repo)
Expand Down
3 changes: 3 additions & 0 deletions test/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ describe('IPFS UnixFS Engine', () => {
require('./exporter')(repo)
require('./exporter-subtree')(repo)

// Reader
require('./reader')(repo)

// Other
require('./import-export')(repo)
require('./import-export-nested-dir')(repo)
Expand Down
Loading

0 comments on commit 66be6b3

Please sign in to comment.