Skip to content
This repository has been archived by the owner on Aug 12, 2020. It is now read-only.

feat: Add reader to read files or part of files as streams #202

Merged
merged 1 commit into from
Mar 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 94 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ const Exporter = require('ipfs-unixfs-engine').Exporter

### new Exporter(<cid or ipfsPath>, <dag or ipld-resolver>)

Uses the given [dag API or an ipld-resolver instance][] to fetch an IPFS [UnixFS][] object(s) by their multiaddress.
Uses the given [dag API] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object(s) by their multiaddress.

Creates a new readable stream in object mode that outputs objects of the form

Expand All @@ -181,9 +181,101 @@ Creates a new readable stream in object mode that outputs objects of the form
Errors are received as with a normal stream, by listening on the `'error'` event to be emitted.


[IPLD Resolver]: https://github.com/ipld/js-ipld-resolver
[dag API]: https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/DAG.md
[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs

## Reader

The `reader` allows you to receive part or all of a file as a [pull-stream].

#### Reader example

```js
const readable = require('ipfs-unixfs-engine').readable
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/collect')

pull(
readable(cid, ipldResolver)
collect((error, chunks) => {
// do something with the file chunks and/or handle errors
})
)
```

#### Reader API

```js
const reader = require('ipfs-unixfs-engine').reader
```

### reader(<cid or ipfsPath>, <dag or ipld-resolver>, <begin>, <end>)

Uses the given [dag API][] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object by their multiaddress.

Creates a new [pull-stream][] that sends the requested chunks of data as a series of [Buffer][] objects.

```js
const readable = require('ipfs-unixfs-engine').readable
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/drain')

pull(
readable(cid, ipldResolver),
drain((chunk) => {
// do something with the file chunk
})
)
```

#### `begin` and `end`

`begin` and `end` arguments can optionally be passed to the reader function. These follow the same semantics as the JavaScript [`Array.slice(begin, end)`][] method.

That is: `begin` is the index in the stream to start sending data, `end` is the index *before* which to stop sending data.

A negative `begin` starts the slice from the end of the stream and a negative `end` ends the slice by subtracting `end` from the total stream length.

See [the tests](test/reader.js) for examples of using these arguments.

```js
const readable = require('ipfs-unixfs-engine').readable
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/drain')

pull(
readable(cid, ipldResolver, 0, 10)
drain((chunk) => {
// chunk is a Buffer containing only the first 10 bytes of the stream
})
)
```

#### Errors

Errors are received by [pull-stream][] sinks.

```js
const readable = require('ipfs-unixfs-engine').readable
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/collect')

pull(
readable(cid, ipldResolver, 0, 10)
collect((error, chunks) => {
// handle the error
})
)
```

[pull-stream]: https://www.npmjs.com/package/pull-stream
[Buffer]: https://www.npmjs.com/package/buffer
[dag API]: https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/DAG.md
[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs
[`Array.slice(begin, end)`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/slice

## Contribute

Feel free to join in. All welcome. Open an [issue](https://github.com/ipfs/js-ipfs-unixfs-engine/issues)!
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"lodash": "^4.17.5",
"multihashes": "~0.4.13",
"multihashing-async": "~0.4.8",
"pull-async-values": "^1.0.3",
"pull-batch": "^1.0.0",
"pull-block": "^1.4.0",
"pull-cat": "^1.1.11",
Expand Down
1 change: 1 addition & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@

exports.importer = exports.Importer = require('./importer')
exports.exporter = exports.Exporter = require('./exporter')
exports.reader = exports.Reader = require('./reader')
137 changes: 137 additions & 0 deletions src/reader/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
'use strict'

const CID = require('cids')
const pull = require('pull-stream')
const asyncValues = require('pull-async-values')
const asyncMap = require('pull-stream/throughs/async-map')
const map = require('pull-stream/throughs/map')
const UnixFS = require('ipfs-unixfs')
const toB58String = require('multihashes').toB58String
const waterfall = require('async/waterfall')

module.exports = (path, ipldResolver, begin = 0, end) => {
let streamPosition = 0

return pull(
asyncValues((cb) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, didn't know about pull-async-values.
Couldn't this be done instead with something like a pull.value([path]) and then using a chain of pull.map and pull.asyncMap?
I'm asking because this way we wouldn't need to add an extra dependency..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find an easy way to asynchronously supply an array of values to pull-stream when you don't know the length of the values ahead of time using only the built-in functions.

The faq recommends using external modules for 1:many transforms and async steps so I'm not sure it's possible without adding an extra dep.

That said, the source for pull-async-values is pretty small - I can pull it in to this repo if required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@achingbrain Thanks for explaining, it's fine as it is by me :)

waterfall([
(next) => toCid(path, next),
(cid, next) => ipldResolver.get(cid, next),
(node, next) => {
const meta = UnixFS.unmarshal(node.value.data)

if (meta.type !== 'file') {
return next(new Error(`Path ${path} was not a file (was ${meta.type}), can only read files`))
}

const fileSize = meta.fileSize()

if (!end || end > fileSize) {
end = fileSize
}

if (begin < 0) {
begin = fileSize + begin
}

if (end < 0) {
end = fileSize + end
}

const links = node.value.links

if (!links || !links.length) {
if (meta.data && meta.data.length) {
// file was small enough to fit in one DAGNode so has no links
return next(null, [(done) => done(null, meta.data)])
}

return next(new Error(`Path ${path} had no links or data`))
}

const linkedDataSize = links.reduce((acc, curr) => acc + curr.size, 0)
const overhead = (linkedDataSize - meta.fileSize()) / links.length

// create an array of functions to fetch link data
next(null, links.map((link) => (done) => {
// DAGNode Links report unixfs object data sizes $overhead bytes (typically 14)
// larger than they actually are due to the protobuf wrapper
const bytesInLinkedObjectData = link.size - overhead

if (begin > (streamPosition + bytesInLinkedObjectData)) {
// Start byte is after this block so skip it
streamPosition += bytesInLinkedObjectData

return done()
}

if (end < streamPosition) {
// End byte was before this block so skip it
streamPosition += bytesInLinkedObjectData

return done()
}

// transform the multihash to a cid, the cid to a node and the node to some data
waterfall([
(next) => toCid(link.multihash, next),
(cid, next) => ipldResolver.get(cid, next),
(node, next) => next(null, node.value.data),
(data, next) => next(null, UnixFS.unmarshal(data).data)
], done)
}))
}
], cb)
}),
asyncMap((loadLinkData, cb) => loadLinkData(cb)),
pull.filter(Boolean),
map((data) => {
const block = extractDataFromBlock(data, streamPosition, begin, end)

streamPosition += data.length

return block
})
)
}

function toCid (input, callback) {
let path = input
let cid

try {
if (Buffer.isBuffer(path)) {
path = toB58String(path)
}

if (path.indexOf('/ipfs/') === 0) {
path = path.substring('/ipfs/'.length)
}

if (path.charAt(path.length - 1) === '/') {
path = path.substring(0, path.length - 1)
}

cid = new CID(path)
} catch (error) {
return callback(new Error(`Path '${input}' was invalid: ${error.message}`))
}

callback(null, cid)
}

function extractDataFromBlock (block, streamPosition, begin, end) {
const blockLength = block.length

if (end - streamPosition < blockLength) {
// If the end byte is in the current block, truncate the block to the end byte
block = block.slice(0, end - streamPosition)
}

if (begin > streamPosition && begin < (streamPosition + blockLength)) {
// If the start byte is in the current block, skip to the start byte
block = block.slice(begin - streamPosition)
}

return block
}
3 changes: 3 additions & 0 deletions test/browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ describe('IPFS data importing tests on the Browser', function () {
// require('./exporter')(repo)
// require('./exporter-subtree')(repo)

// Reader
require('./reader')(repo)

// Other
require('./import-export')(repo)
require('./import-export-nested-dir')(repo)
Expand Down
3 changes: 3 additions & 0 deletions test/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ describe('IPFS UnixFS Engine', () => {
require('./exporter')(repo)
require('./exporter-subtree')(repo)

// Reader
require('./reader')(repo)

// Other
require('./import-export')(repo)
require('./import-export-nested-dir')(repo)
Expand Down
Loading