Skip to content

Commit

Permalink
feat: http upload/download progress handlers (#54)
Browse files Browse the repository at this point in the history
As per #52 this adds `onUploadProgress` / `onDownloadProgress` optional handlers. Intention is to allow ipfs-webui to render file upload progress when new content is added.

Fixes #52

Co-authored-by: Alex Potsides <alex@achingbrain.net>
  • Loading branch information
Gozala and achingbrain authored Aug 12, 2020
1 parent 78ad2d2 commit d30be96
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 23 deletions.
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"dist"
],
"browser": {
"./src/http/fetch.js": "./src/http/fetch.browser.js",
"./src/text-encoder.js": "./src/text-encoder.browser.js",
"./src/text-decoder.js": "./src/text-decoder.browser.js",
"./src/temp-dir.js": "./src/temp-dir.browser.js",
Expand Down Expand Up @@ -44,15 +45,15 @@
"merge-options": "^2.0.0",
"nanoid": "^3.1.3",
"node-fetch": "^2.6.0",
"stream-to-it": "^0.2.0"
"stream-to-it": "^0.2.0",
"it-to-stream": "^0.1.2"
},
"devDependencies": {
"aegir": "^25.0.0",
"delay": "^4.3.0",
"it-all": "^1.0.2",
"it-drain": "^1.0.1",
"it-last": "^1.0.2",
"it-to-stream": "^0.1.2"
"it-last": "^1.0.2"
},
"contributors": [
"Hugo Dias <hugomrdias@gmail.com>",
Expand All @@ -63,4 +64,4 @@
"Irakli Gozalishvili <contact@gozala.io>",
"Marcin Rataj <lidel@lidel.org>"
]
}
}
23 changes: 4 additions & 19 deletions src/http.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,14 @@
/* eslint-disable no-undef */
'use strict'

const fetch = require('node-fetch')
const { fetch, Request, Headers } = require('./http/fetch')
const { TimeoutError, HTTPError } = require('./http/error')
const merge = require('merge-options').bind({ ignoreUndefined: true })
const { URL, URLSearchParams } = require('iso-url')
const TextDecoder = require('./text-decoder')
const AbortController = require('abort-controller')
const anySignal = require('any-signal')

const Request = fetch.Request
const Headers = fetch.Headers

class TimeoutError extends Error {
constructor () {
super('Request timed out')
this.name = 'TimeoutError'
}
}

class HTTPError extends Error {
constructor (response) {
super(response.statusText)
this.name = 'HTTPError'
this.response = response
}
}

const timeout = (promise, ms, abortController) => {
if (ms === undefined) {
return promise
Expand Down Expand Up @@ -87,6 +70,8 @@ const defaults = {
* @prop {function(URLSearchParams): URLSearchParams } [transformSearchParams]
* @prop {function(any): any} [transform] - When iterating the response body, transform each chunk with this function.
* @prop {function(Response): Promise<void>} [handleError] - Handle errors
* @prop {function({total:number, loaded:number, lengthComputable:boolean}):void} [onUploadProgress] - Can be passed to track upload progress
* @prop {function({total:number, loaded:number, lengthComputable:boolean}):void} [onDownloadProgress] - Can be passed to track download progress
*/

class HTTP {
Expand Down
26 changes: 26 additions & 0 deletions src/http/error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict'

class TimeoutError extends Error {
constructor (message = 'Request timed out') {
super(message)
this.name = 'TimeoutError'
}
}
exports.TimeoutError = TimeoutError

class AbortError extends Error {
constructor (message = 'The operation was aborted.') {
super(message)
this.name = 'AbortError'
}
}
exports.AbortError = AbortError

class HTTPError extends Error {
constructor (response) {
super(response.statusText)
this.name = 'HTTPError'
this.response = response
}
}
exports.HTTPError = HTTPError
124 changes: 124 additions & 0 deletions src/http/fetch.browser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
'use strict'
/* eslint-env browser */

const { TimeoutError, AbortError } = require('./error')

/**
* @typedef {RequestInit & ExtraFetchOptions} FetchOptions
* @typedef {Object} ExtraFetchOptions
* @property {number} [timeout]
* @property {URLSearchParams} [searchParams]
* @property {function({total:number, loaded:number, lengthComputable:boolean}):void} [onUploadProgress]
* @property {function({total:number, loaded:number, lengthComputable:boolean}):void} [onDownloadProgress]
* @property {string} [overrideMimeType]
* @returns {Promise<Response>}
*/

/**
* @param {string|URL} url
* @param {FetchOptions} [options]
* @returns {Promise<Response>}
*/
const fetch = (url, options = {}) => {
const request = new XMLHttpRequest()
request.open(options.method || 'GET', url.toString(), true)

const { timeout } = options
if (timeout > 0 && timeout < Infinity) {
request.timeout = options.timeout
}

if (options.overrideMimeType != null) {
request.overrideMimeType(options.overrideMimeType)
}

if (options.headers) {
for (const [name, value] of options.headers.entries()) {
request.setRequestHeader(name, value)
}
}

if (options.signal) {
options.signal.onabort = () => request.abort()
}

if (options.onDownloadProgress) {
request.onprogress = options.onDownloadProgress
}

if (options.onUploadProgress) {
request.upload.onprogress = options.onUploadProgress
}

return new Promise((resolve, reject) => {
/**
* @param {Event} event
*/
const handleEvent = (event) => {
switch (event.type) {
case 'error': {
resolve(Response.error())
break
}
case 'load': {
resolve(
new ResponseWithURL(request.responseURL, request.response, {
status: request.status,
statusText: request.statusText,
headers: parseHeaders(request.getAllResponseHeaders())
})
)
break
}
case 'timeout': {
reject(new TimeoutError())
break
}
case 'abort': {
reject(new AbortError())
break
}
default: {
break
}
}
}
request.onerror = handleEvent
request.onload = handleEvent
request.ontimeout = handleEvent
request.onabort = handleEvent

request.send(options.body)
})
}
exports.fetch = fetch
exports.Request = Request
exports.Headers = Headers

/**
* @param {string} input
* @returns {Headers}
*/
const parseHeaders = (input) => {
const headers = new Headers()
for (const line of input.trim().split(/[\r\n]+/)) {
const index = line.indexOf(': ')
if (index > 0) {
headers.set(line.slice(0, index), line.slice(index + 1))
}
}

return headers
}

class ResponseWithURL extends Response {
/**
* @param {string} url
* @param {string|Blob|ArrayBufferView|ArrayBuffer|FormData|ReadableStream<Uint8Array>} body
* @param {ResponseInit} options
*/
constructor (url, body, options) {
super(body, options)
Object.defineProperty(this, 'url', { value: url })
}
}
9 changes: 9 additions & 0 deletions src/http/fetch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
'use strict'

// Electron has `XMLHttpRequest` and should get the browser implementation
// instead of node.
if (typeof XMLHttpRequest !== 'undefined') {
module.exports = require('./fetch.browser')
} else {
module.exports = require('./fetch.node')
}
133 changes: 133 additions & 0 deletions src/http/fetch.node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// @ts-check
'use strict'

/** @type {import('node-fetch') & typeof fetch} */
// @ts-ignore
const nodeFetch = require('node-fetch')
const toStream = require('it-to-stream')
const { Buffer } = require('buffer')
const { Request, Response, Headers } = nodeFetch
/**
* @typedef {RequestInit & ExtraFetchOptions} FetchOptions
*
* @typedef {import('stream').Readable} Readable
* @typedef {Object} LoadProgress
* @property {number} total
* @property {number} loaded
* @property {boolean} lengthComputable
* @typedef {Object} ExtraFetchOptions
* @property {number} [timeout]
* @property {URLSearchParams} [searchParams]
* @property {function(LoadProgress):void} [onUploadProgress]
* @property {function(LoadProgress):void} [onDownloadProgress]
* @property {string} [overrideMimeType]
* @returns {Promise<Response>}
*/

/**
* @param {string|URL} url
* @param {FetchOptions} [options]
* @returns {Promise<Response>}
*/
const fetch = async (url, options = {}) => {
const { onDownloadProgress } = options

const response = await nodeFetch(url, withUploadProgress(options))

if (onDownloadProgress) {
return withDownloadProgress(response, onDownloadProgress)
} else {
return response
}
}
exports.fetch = fetch
exports.Request = Request
exports.Headers = Headers

/**
* Takes fetch options and wraps request body to track uploda progress if
* `onUploadProgress` is supplied. Otherwise returns options as is.
* @param {FetchOptions} options
* @returns {FetchOptions}
*/
const withUploadProgress = (options) => {
const { onUploadProgress } = options
if (onUploadProgress) {
return {
...options,
// @ts-ignore
body: bodyWithUploadProgress(options, onUploadProgress)
}
} else {
return options
}
}

/**
* Takes request `body` and `onUploadProgress` handler and returns wrapped body
* that as consumed will report progress to suppled `onUploadProgress` handler.
* @param {FetchOptions} init
* @param {function(LoadProgress):void} onUploadProgress
* @returns {Readable}
*/
const bodyWithUploadProgress = (init, onUploadProgress) => {
// @ts-ignore - node-fetch is typed poorly
const { body } = new Response(init.body, init)
// @ts-ignore - Unlike standard Response, node-fetch `body` has a differnt
// type see: see https://github.com/node-fetch/node-fetch/blob/master/src/body.js
const source = iterateBodyWithProgress(body, onUploadProgress)
return toStream.readable(source)
}

/**
* Takes body from node-fetch response as body and `onUploadProgress` handler
* and returns async iterable that emits body chunks and emits
* `onUploadProgress`.
* @param {Buffer|null|Readable} body
* @param {function(LoadProgress):void} onUploadProgress
* @returns {AsyncIterable<Buffer>}
*/
const iterateBodyWithProgress = async function * (body, onUploadProgress) {
/** @type {Buffer|null|Readable} */
if (body == null) {
onUploadProgress({ total: 0, loaded: 0, lengthComputable: true })
} else if (Buffer.isBuffer(body)) {
const total = body.byteLength
const lengthComputable = true
onUploadProgress({ total, loaded: 0, lengthComputable })
yield body
onUploadProgress({ total, loaded: total, lengthComputable })
} else {
const total = 0
const lengthComputable = false
let loaded = 0
onUploadProgress({ total, loaded, lengthComputable })
for await (const chunk of body) {
loaded += chunk.byteLength
yield chunk
onUploadProgress({ total, loaded, lengthComputable })
}
}
}

/**
* Takes node-fetch response and tracks download progress for it.
* @param {Response} response
* @param {function(LoadProgress):void} onDownloadProgress
* @returns {Response}
*/
const withDownloadProgress = (response, onDownloadProgress) => {
/** @type {Readable} */
// @ts-ignore - Unlike standard Response, in node-fetch response body is
// node Readable stream.
const { body } = response
const length = parseInt(response.headers.get('Content-Length'))
const lengthComputable = !isNaN(length)
const total = isNaN(length) ? 0 : length
let loaded = 0
body.on('data', (chunk) => {
loaded += chunk.length
onDownloadProgress({ lengthComputable, total, loaded })
})
return response
}
Loading

0 comments on commit d30be96

Please sign in to comment.