Skip to content
This repository has been archived by the owner on Apr 29, 2020. It is now read-only.

Commit

Permalink
chore: refactor to async/await
Browse files Browse the repository at this point in the history
BREAKING CHANGE: This module used to export a class that extended EventEmitter,
now it exports a function that returns an async iterable.

I also updated the deps to use the latest http api, though it's removed
the ability to add whole paths at once, along with some special logic
to handle symlinks.  The `Dicer` module that this module depends on
will still emit events for when it encounters symlinks so I left the
handlers in though am unsure if we actually use them.
  • Loading branch information
achingbrain committed Aug 14, 2019
1 parent edc2f72 commit 79c916f
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 326 deletions.
35 changes: 17 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,30 @@ npm install ipfs-multipart
## Usage
```javascript
const http = require('http')
const IPFSMultipart = require('ipfs-multipart')
const parser = require('ipfs-multipart')

http.createServer((req, res) => {
http.createServer(async (req, res) => {
if (req.method === 'POST' && req.headers['content-type']) {
const parser = IPFSMultipart.reqParser(req)

parser.on('file', (fileName, fileStream) => {
console.log(`file ${fileName} start`)
for await (const entry of parser(req)) {
if (entry.type === 'directory') {
console.log(`dir ${entry.name} start`)
}

fileStream.on('data', (data) => {
console.log(`file ${fileName} contents:`, data.toString())
})
if (entry.type === 'file') {
console.log(`file ${entry.name} start`)

fileStream.on('end', (data) => {
console.log(`file ${fileName} end`)
})
})
for await (const data of entry.content) {
console.log(`file ${entry.name} contents:`, data.toString())
}

parser.on('end', () => {
console.log('finished parsing')
res.writeHead(200)
res.end()
})
console.log(`file ${entry.name} end`)
}
}

return
console.log('finished parsing')
res.writeHead(200)
res.end()
}

res.writeHead(404)
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
},
"dependencies": {
"@hapi/content": "^4.1.0",
"dicer": "~0.3.0"
"dicer": "~0.3.0",
"event-iterator": "^1.2.0"
},
"devDependencies": {
"aegir": "^20.0.0",
"chai": "^4.2.0",
"ipfs-api": "github:ipfs/js-ipfs-api#1fd9749",
"ipfs-api": "^26.1.2",
"request": "^2.88.0"
},
"engines": {
Expand Down
26 changes: 11 additions & 15 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
'use strict'

const content = require('@hapi/content')
const Parser = require('./parser')
const parser = require('./parser')

module.exports = {
Parser,
/**
* Request Parser
*
* @param {Object} req - Request
* @returns {Parser}
*/
reqParser: (req) => {
const boundary = content.type(req.headers['content-type']).boundary
const parser = new Parser({ boundary: boundary })
req.pipe(parser)
return parser
}
/**
* Request Parser
*
* @param {Object} req - Request
* @returns {Object} an async iterable
*/
module.exports = function (req) {
const boundary = content.type(req.headers['content-type']).boundary

return parser(boundary, req.payload || req)
}
138 changes: 71 additions & 67 deletions src/parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,32 @@

const Dicer = require('dicer')
const Content = require('@hapi/content')
const stream = require('stream')
const util = require('util')
const Transform = stream.Transform
const {
EventIterator
} = require('event-iterator')

const multipartFormdataType = 'multipart/form-data'
const applicationDirectory = 'application/x-directory'
const applicationSymlink = 'application/symlink'

function subscribe (emitter, dataEvent = 'data', endEvent = 'end', evOptions = {}) {
return new EventIterator((push, stop, fail) => {
emitter.addListener(dataEvent, push)
emitter.addListener(endEvent, stop)
emitter.addListener('error', fail)
}, (push, stop, fail) => {
emitter.removeListener(dataEvent, push)
emitter.removeListener(endEvent, stop)
emitter.removeListener('error', fail)

if (typeof emitter.destroy === 'function') {
emitter.destroy()
} else if (typeof emitter.close === 'function') {
emitter.close()
}
}, evOptions)
}

const isDirectory = (mediatype) => mediatype === multipartFormdataType || mediatype === applicationDirectory

const parseDisposition = (disposition) => {
Expand All @@ -29,75 +47,61 @@ const parseHeader = (header) => {
const disposition = parseDisposition(header['content-disposition'][0])

const details = type
details.name = disposition.name
details.name = decodeURIComponent(disposition.name)
details.type = disposition.type

return details
}

/**
* Parser
*
* @constructor
* @param {Object} options
* @returns {Parser}
*/
function Parser (options) {
// allow use without new
if (!(this instanceof Parser)) {
return new Parser(options)
}

this.dicer = new Dicer({ boundary: options.boundary })

this.dicer.on('part', (part) => this.handlePart(part))

this.dicer.on('error', (err) => this.emit('err', err))

this.dicer.on('finish', () => {
this.emit('finish')
this.emit('end')
})

Transform.call(this, options)
}
util.inherits(Parser, Transform)

Parser.prototype._transform = function (chunk, enc, cb) {
this.dicer.write(chunk, enc)
cb()
}

Parser.prototype._flush = function (cb) {
this.dicer.end()
cb()
}

Parser.prototype.handlePart = function (part) {
part.on('header', (header) => {
const partHeader = parseHeader(header)

if (isDirectory(partHeader.mime)) {
part.on('data', () => false)
this.emit('directory', partHeader.name)
return
async function * parser (boundary, stream) {
const dicer = new Dicer({ boundary })
stream.pipe(dicer)

for await (const part of subscribe(dicer, 'part', 'finish')) {
for await (const header of subscribe(part, 'header')) {
const partHeader = parseHeader(header)

if (isDirectory(partHeader.mime)) {
// consume data from stream so we move on to the next header/part
part.on('data', () => {})

yield {
type: 'directory',
name: partHeader.name
}

continue
}

if (partHeader.mime === applicationSymlink) {
// consume data from stream so we move on to the next header/part
part.on('data', () => {})

yield {
type: 'symlink',
name: partHeader.name,
target: partHeader.target
}

continue
}

if (partHeader.boundary) {
// recursively parse nested multiparts
for await (const entry of parser(partHeader.boundary, part)) {
yield entry
}

continue
}

yield {
type: 'file',
name: partHeader.name,
content: subscribe(part)
}
}

if (partHeader.mime === applicationSymlink) {
part.on('data', (target) => this.emit('symlink', partHeader.name, target.toString()))
return
}

if (partHeader.boundary) {
// recursively parse nested multiparts
const parser = new Parser({ boundary: partHeader.boundary })
parser.on('file', (file) => this.emit('file', file))
part.pipe(parser)
return
}

this.emit('file', partHeader.name, part)
})
}
}

module.exports = Parser
module.exports = parser
14 changes: 0 additions & 14 deletions test/node.js

This file was deleted.

Loading

0 comments on commit 79c916f

Please sign in to comment.