diff --git a/package.json b/package.json index 8b37399..9141548 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,7 @@ "dist" ], "browser": { + "./src/http/fetch.js": "./src/http/fetch.browser.js", "./src/text-encoder.js": "./src/text-encoder.browser.js", "./src/text-decoder.js": "./src/text-decoder.browser.js", "./src/temp-dir.js": "./src/temp-dir.browser.js", @@ -44,15 +45,15 @@ "merge-options": "^2.0.0", "nanoid": "^3.1.3", "node-fetch": "^2.6.0", - "stream-to-it": "^0.2.0" + "stream-to-it": "^0.2.0", + "it-to-stream": "^0.1.2" }, "devDependencies": { "aegir": "^25.0.0", "delay": "^4.3.0", "it-all": "^1.0.2", "it-drain": "^1.0.1", - "it-last": "^1.0.2", - "it-to-stream": "^0.1.2" + "it-last": "^1.0.2" }, "contributors": [ "Hugo Dias ", @@ -63,4 +64,4 @@ "Irakli Gozalishvili ", "Marcin Rataj " ] -} +} \ No newline at end of file diff --git a/src/http.js b/src/http.js index 6655e06..e55fc4d 100644 --- a/src/http.js +++ b/src/http.js @@ -1,31 +1,14 @@ /* eslint-disable no-undef */ 'use strict' -const fetch = require('node-fetch') +const { fetch, Request, Headers } = require('./http/fetch') +const { TimeoutError, HTTPError } = require('./http/error') const merge = require('merge-options').bind({ ignoreUndefined: true }) const { URL, URLSearchParams } = require('iso-url') const TextDecoder = require('./text-decoder') const AbortController = require('abort-controller') const anySignal = require('any-signal') -const Request = fetch.Request -const Headers = fetch.Headers - -class TimeoutError extends Error { - constructor () { - super('Request timed out') - this.name = 'TimeoutError' - } -} - -class HTTPError extends Error { - constructor (response) { - super(response.statusText) - this.name = 'HTTPError' - this.response = response - } -} - const timeout = (promise, ms, abortController) => { if (ms === undefined) { return promise @@ -87,6 +70,8 @@ const defaults = { * @prop {function(URLSearchParams): URLSearchParams } [transformSearchParams] * @prop {function(any): any} [transform] - When iterating the response body, transform each chunk with this function. * @prop {function(Response): Promise} [handleError] - Handle errors + * @prop {function({total:number, loaded:number, lengthComputable:boolean}):void} [onUploadProgress] - Can be passed to track upload progress + * @prop {function({total:number, loaded:number, lengthComputable:boolean}):void} [onDownloadProgress] - Can be passed to track download progress */ class HTTP { diff --git a/src/http/error.js b/src/http/error.js new file mode 100644 index 0000000..057d2c5 --- /dev/null +++ b/src/http/error.js @@ -0,0 +1,26 @@ +'use strict' + +class TimeoutError extends Error { + constructor (message = 'Request timed out') { + super(message) + this.name = 'TimeoutError' + } +} +exports.TimeoutError = TimeoutError + +class AbortError extends Error { + constructor (message = 'The operation was aborted.') { + super(message) + this.name = 'AbortError' + } +} +exports.AbortError = AbortError + +class HTTPError extends Error { + constructor (response) { + super(response.statusText) + this.name = 'HTTPError' + this.response = response + } +} +exports.HTTPError = HTTPError diff --git a/src/http/fetch.browser.js b/src/http/fetch.browser.js new file mode 100644 index 0000000..08d1319 --- /dev/null +++ b/src/http/fetch.browser.js @@ -0,0 +1,124 @@ +'use strict' +/* eslint-env browser */ + +const { TimeoutError, AbortError } = require('./error') + +/** + * @typedef {RequestInit & ExtraFetchOptions} FetchOptions + * @typedef {Object} ExtraFetchOptions + * @property {number} [timeout] + * @property {URLSearchParams} [searchParams] + * @property {function({total:number, loaded:number, lengthComputable:boolean}):void} [onUploadProgress] + * @property {function({total:number, loaded:number, lengthComputable:boolean}):void} [onDownloadProgress] + * @property {string} [overrideMimeType] + * @returns {Promise} + */ + +/** + * @param {string|URL} url + * @param {FetchOptions} [options] + * @returns {Promise} + */ +const fetch = (url, options = {}) => { + const request = new XMLHttpRequest() + request.open(options.method || 'GET', url.toString(), true) + + const { timeout } = options + if (timeout > 0 && timeout < Infinity) { + request.timeout = options.timeout + } + + if (options.overrideMimeType != null) { + request.overrideMimeType(options.overrideMimeType) + } + + if (options.headers) { + for (const [name, value] of options.headers.entries()) { + request.setRequestHeader(name, value) + } + } + + if (options.signal) { + options.signal.onabort = () => request.abort() + } + + if (options.onDownloadProgress) { + request.onprogress = options.onDownloadProgress + } + + if (options.onUploadProgress) { + request.upload.onprogress = options.onUploadProgress + } + + return new Promise((resolve, reject) => { + /** + * @param {Event} event + */ + const handleEvent = (event) => { + switch (event.type) { + case 'error': { + resolve(Response.error()) + break + } + case 'load': { + resolve( + new ResponseWithURL(request.responseURL, request.response, { + status: request.status, + statusText: request.statusText, + headers: parseHeaders(request.getAllResponseHeaders()) + }) + ) + break + } + case 'timeout': { + reject(new TimeoutError()) + break + } + case 'abort': { + reject(new AbortError()) + break + } + default: { + break + } + } + } + request.onerror = handleEvent + request.onload = handleEvent + request.ontimeout = handleEvent + request.onabort = handleEvent + + request.send(options.body) + }) +} +exports.fetch = fetch +exports.Request = Request +exports.Headers = Headers + +/** + * @param {string} input + * @returns {Headers} + */ +const parseHeaders = (input) => { + const headers = new Headers() + for (const line of input.trim().split(/[\r\n]+/)) { + const index = line.indexOf(': ') + if (index > 0) { + headers.set(line.slice(0, index), line.slice(index + 1)) + } + } + + return headers +} + +class ResponseWithURL extends Response { + /** + * @param {string} url + * @param {string|Blob|ArrayBufferView|ArrayBuffer|FormData|ReadableStream} body + * @param {ResponseInit} options + */ + constructor (url, body, options) { + super(body, options) + Object.defineProperty(this, 'url', { value: url }) + } +} diff --git a/src/http/fetch.js b/src/http/fetch.js new file mode 100644 index 0000000..c968b1e --- /dev/null +++ b/src/http/fetch.js @@ -0,0 +1,9 @@ +'use strict' + +// Electron has `XMLHttpRequest` and should get the browser implementation +// instead of node. +if (typeof XMLHttpRequest !== 'undefined') { + module.exports = require('./fetch.browser') +} else { + module.exports = require('./fetch.node') +} diff --git a/src/http/fetch.node.js b/src/http/fetch.node.js new file mode 100644 index 0000000..3c77f44 --- /dev/null +++ b/src/http/fetch.node.js @@ -0,0 +1,133 @@ +// @ts-check +'use strict' + +/** @type {import('node-fetch') & typeof fetch} */ +// @ts-ignore +const nodeFetch = require('node-fetch') +const toStream = require('it-to-stream') +const { Buffer } = require('buffer') +const { Request, Response, Headers } = nodeFetch +/** + * @typedef {RequestInit & ExtraFetchOptions} FetchOptions + * + * @typedef {import('stream').Readable} Readable + * @typedef {Object} LoadProgress + * @property {number} total + * @property {number} loaded + * @property {boolean} lengthComputable + * @typedef {Object} ExtraFetchOptions + * @property {number} [timeout] + * @property {URLSearchParams} [searchParams] + * @property {function(LoadProgress):void} [onUploadProgress] + * @property {function(LoadProgress):void} [onDownloadProgress] + * @property {string} [overrideMimeType] + * @returns {Promise} + */ + +/** + * @param {string|URL} url + * @param {FetchOptions} [options] + * @returns {Promise} + */ +const fetch = async (url, options = {}) => { + const { onDownloadProgress } = options + + const response = await nodeFetch(url, withUploadProgress(options)) + + if (onDownloadProgress) { + return withDownloadProgress(response, onDownloadProgress) + } else { + return response + } +} +exports.fetch = fetch +exports.Request = Request +exports.Headers = Headers + +/** + * Takes fetch options and wraps request body to track uploda progress if + * `onUploadProgress` is supplied. Otherwise returns options as is. + * @param {FetchOptions} options + * @returns {FetchOptions} + */ +const withUploadProgress = (options) => { + const { onUploadProgress } = options + if (onUploadProgress) { + return { + ...options, + // @ts-ignore + body: bodyWithUploadProgress(options, onUploadProgress) + } + } else { + return options + } +} + +/** + * Takes request `body` and `onUploadProgress` handler and returns wrapped body + * that as consumed will report progress to suppled `onUploadProgress` handler. + * @param {FetchOptions} init + * @param {function(LoadProgress):void} onUploadProgress + * @returns {Readable} + */ +const bodyWithUploadProgress = (init, onUploadProgress) => { + // @ts-ignore - node-fetch is typed poorly + const { body } = new Response(init.body, init) + // @ts-ignore - Unlike standard Response, node-fetch `body` has a differnt + // type see: see https://github.com/node-fetch/node-fetch/blob/master/src/body.js + const source = iterateBodyWithProgress(body, onUploadProgress) + return toStream.readable(source) +} + +/** + * Takes body from node-fetch response as body and `onUploadProgress` handler + * and returns async iterable that emits body chunks and emits + * `onUploadProgress`. + * @param {Buffer|null|Readable} body + * @param {function(LoadProgress):void} onUploadProgress + * @returns {AsyncIterable} + */ +const iterateBodyWithProgress = async function * (body, onUploadProgress) { + /** @type {Buffer|null|Readable} */ + if (body == null) { + onUploadProgress({ total: 0, loaded: 0, lengthComputable: true }) + } else if (Buffer.isBuffer(body)) { + const total = body.byteLength + const lengthComputable = true + onUploadProgress({ total, loaded: 0, lengthComputable }) + yield body + onUploadProgress({ total, loaded: total, lengthComputable }) + } else { + const total = 0 + const lengthComputable = false + let loaded = 0 + onUploadProgress({ total, loaded, lengthComputable }) + for await (const chunk of body) { + loaded += chunk.byteLength + yield chunk + onUploadProgress({ total, loaded, lengthComputable }) + } + } +} + +/** + * Takes node-fetch response and tracks download progress for it. + * @param {Response} response + * @param {function(LoadProgress):void} onDownloadProgress + * @returns {Response} + */ +const withDownloadProgress = (response, onDownloadProgress) => { + /** @type {Readable} */ + // @ts-ignore - Unlike standard Response, in node-fetch response body is + // node Readable stream. + const { body } = response + const length = parseInt(response.headers.get('Content-Length')) + const lengthComputable = !isNaN(length) + const total = isNaN(length) ? 0 : length + let loaded = 0 + body.on('data', (chunk) => { + loaded += chunk.length + onDownloadProgress({ lengthComputable, total, loaded }) + }) + return response +} diff --git a/test/http.spec.js b/test/http.spec.js index b79ea62..2e48a01 100644 --- a/test/http.spec.js +++ b/test/http.spec.js @@ -150,4 +150,29 @@ describe('http', function () { await expect(drain(HTTP.ndjson(res.body))).to.eventually.be.rejectedWith(/aborted/) }) + + it('progress events', async () => { + let upload = 0 + let download = 0 + const body = new Uint8Array(1000000 / 2) + const request = await HTTP.post(`${process.env.ECHO_SERVER}/echo`, { + body, + onUploadProgress: (progress) => { + expect(progress).to.have.property('lengthComputable').to.be.a('boolean') + expect(progress).to.have.property('total', body.byteLength) + expect(progress).to.have.property('loaded').to.be.a('number') + upload += 1 + }, + onDownloadProgress: (progress) => { + expect(progress).to.have.property('lengthComputable').to.be.a('boolean') + expect(progress).to.have.property('total').to.be.a('number') + expect(progress).to.have.property('loaded').to.be.a('number') + download += 1 + } + }) + await all(request.iterator()) + + expect(upload).to.be.greaterThan(0) + expect(download).to.be.greaterThan(0) + }) })