Skip to content

Commit 1b956f4

Browse files
committed
[feature] Add support for Blob
Closes #2206
1 parent 6a00029 commit 1b956f4

File tree

9 files changed

+769
-93
lines changed

9 files changed

+769
-93
lines changed

doc/ws.md

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -466,10 +466,11 @@ does nothing if `type` is not one of `'close'`, `'error'`, `'message'`, or
466466
- {String}
467467

468468
A string indicating the type of binary data being transmitted by the connection.
469-
This should be one of "nodebuffer", "arraybuffer" or "fragments". Defaults to
470-
"nodebuffer". Type "fragments" will emit the array of fragments as received from
471-
the sender, without copyfull concatenation, which is useful for the performance
472-
of binary protocols transferring large messages with multiple fragments.
469+
This should be one of "nodebuffer", "arraybuffer", "blob", or "fragments".
470+
Defaults to "nodebuffer". Type "fragments" will emit the array of fragments as
471+
received from the sender, without copyfull concatenation, which is useful for
472+
the performance of binary protocols transferring large messages with multiple
473+
fragments.
473474

474475
### websocket.bufferedAmount
475476

@@ -538,7 +539,8 @@ is a noop if the ready state is `CONNECTING` or `CLOSED`.
538539

539540
### websocket.ping([data[, mask]][, callback])
540541

541-
- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The
542+
- `data`
543+
{Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The
542544
data to send in the ping frame.
543545
- `mask` {Boolean} Specifies whether `data` should be masked or not. Defaults to
544546
`true` when `websocket` is not a server client.
@@ -550,7 +552,8 @@ Send a ping. This method throws an error if the ready state is `CONNECTING`.
550552

551553
### websocket.pong([data[, mask]][, callback])
552554

553-
- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The
555+
- `data`
556+
{Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The
554557
data to send in the pong frame.
555558
- `mask` {Boolean} Specifies whether `data` should be masked or not. Defaults to
556559
`true` when `websocket` is not a server client.
@@ -588,7 +591,8 @@ only removes listeners added with
588591

589592
### websocket.send(data[, options][, callback])
590593

591-
- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The
594+
- `data`
595+
{Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The
592596
data to send. `Object` values are only supported if they conform to the
593597
requirements of [`Buffer.from()`][]. If those constraints are not met, a
594598
`TypeError` is thrown.

lib/constants.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
'use strict';
22

3+
const BINARY_TYPES = ['nodebuffer', 'arraybuffer', 'fragments'];
4+
const hasBlob = typeof Blob !== 'undefined';
5+
6+
if (hasBlob) BINARY_TYPES.push('blob');
7+
38
module.exports = {
4-
BINARY_TYPES: ['nodebuffer', 'arraybuffer', 'fragments'],
9+
BINARY_TYPES,
510
EMPTY_BUFFER: Buffer.alloc(0),
611
GUID: '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
12+
hasBlob,
713
kForOnEventAttribute: Symbol('kIsForOnEventAttribute'),
814
kListener: Symbol('kListener'),
915
kStatusCode: Symbol('status-code'),

lib/receiver.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,8 @@ class Receiver extends Writable {
559559
data = concat(fragments, messageLength);
560560
} else if (this._binaryType === 'arraybuffer') {
561561
data = toArrayBuffer(concat(fragments, messageLength));
562+
} else if (this._binaryType === 'blob') {
563+
data = new Blob(fragments);
562564
} else {
563565
data = fragments;
564566
}

lib/sender.js

Lines changed: 145 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ const { Duplex } = require('stream');
66
const { randomFillSync } = require('crypto');
77

88
const PerMessageDeflate = require('./permessage-deflate');
9-
const { EMPTY_BUFFER } = require('./constants');
10-
const { isValidStatusCode } = require('./validation');
9+
const { EMPTY_BUFFER, kWebSocket, NOOP } = require('./constants');
10+
const { isBlob, isValidStatusCode } = require('./validation');
1111
const { mask: applyMask, toBuffer } = require('./buffer-util');
1212

1313
const kByteLength = Symbol('kByteLength');
@@ -16,6 +16,10 @@ const RANDOM_POOL_SIZE = 8 * 1024;
1616
let randomPool;
1717
let randomPoolPointer = RANDOM_POOL_SIZE;
1818

19+
const DEFAULT = 0;
20+
const DEFLATING = 1;
21+
const GET_BLOB_DATA = 2;
22+
1923
/**
2024
* HyBi Sender implementation.
2125
*/
@@ -42,8 +46,10 @@ class Sender {
4246
this._compress = false;
4347

4448
this._bufferedBytes = 0;
45-
this._deflating = false;
4649
this._queue = [];
50+
this._state = DEFAULT;
51+
this.onerror = NOOP;
52+
this[kWebSocket] = undefined;
4753
}
4854

4955
/**
@@ -210,7 +216,7 @@ class Sender {
210216
rsv1: false
211217
};
212218

213-
if (this._deflating) {
219+
if (this._state !== DEFAULT) {
214220
this.enqueue([this.dispatch, buf, false, options, cb]);
215221
} else {
216222
this.sendFrame(Sender.frame(buf, options), cb);
@@ -232,6 +238,9 @@ class Sender {
232238
if (typeof data === 'string') {
233239
byteLength = Buffer.byteLength(data);
234240
readOnly = false;
241+
} else if (isBlob(data)) {
242+
byteLength = data.size;
243+
readOnly = false;
235244
} else {
236245
data = toBuffer(data);
237246
byteLength = data.length;
@@ -253,7 +262,13 @@ class Sender {
253262
rsv1: false
254263
};
255264

256-
if (this._deflating) {
265+
if (isBlob(data)) {
266+
if (this._state !== DEFAULT) {
267+
this.enqueue([this.getBlobData, data, false, options, cb]);
268+
} else {
269+
this.getBlobData(data, false, options, cb);
270+
}
271+
} else if (this._state !== DEFAULT) {
257272
this.enqueue([this.dispatch, data, false, options, cb]);
258273
} else {
259274
this.sendFrame(Sender.frame(data, options), cb);
@@ -275,6 +290,9 @@ class Sender {
275290
if (typeof data === 'string') {
276291
byteLength = Buffer.byteLength(data);
277292
readOnly = false;
293+
} else if (isBlob(data)) {
294+
byteLength = data.size;
295+
readOnly = false;
278296
} else {
279297
data = toBuffer(data);
280298
byteLength = data.length;
@@ -296,7 +314,13 @@ class Sender {
296314
rsv1: false
297315
};
298316

299-
if (this._deflating) {
317+
if (isBlob(data)) {
318+
if (this._state !== DEFAULT) {
319+
this.enqueue([this.getBlobData, data, false, options, cb]);
320+
} else {
321+
this.getBlobData(data, false, options, cb);
322+
}
323+
} else if (this._state !== DEFAULT) {
300324
this.enqueue([this.dispatch, data, false, options, cb]);
301325
} else {
302326
this.sendFrame(Sender.frame(data, options), cb);
@@ -330,6 +354,9 @@ class Sender {
330354
if (typeof data === 'string') {
331355
byteLength = Buffer.byteLength(data);
332356
readOnly = false;
357+
} else if (isBlob(data)) {
358+
byteLength = data.size;
359+
readOnly = false;
333360
} else {
334361
data = toBuffer(data);
335362
byteLength = data.length;
@@ -357,40 +384,107 @@ class Sender {
357384

358385
if (options.fin) this._firstFragment = true;
359386

360-
if (perMessageDeflate) {
361-
const opts = {
362-
[kByteLength]: byteLength,
363-
fin: options.fin,
364-
generateMask: this._generateMask,
365-
mask: options.mask,
366-
maskBuffer: this._maskBuffer,
367-
opcode,
368-
readOnly,
369-
rsv1
370-
};
371-
372-
if (this._deflating) {
373-
this.enqueue([this.dispatch, data, this._compress, opts, cb]);
387+
const opts = {
388+
[kByteLength]: byteLength,
389+
fin: options.fin,
390+
generateMask: this._generateMask,
391+
mask: options.mask,
392+
maskBuffer: this._maskBuffer,
393+
opcode,
394+
readOnly,
395+
rsv1
396+
};
397+
398+
if (isBlob(data)) {
399+
if (this._state !== DEFAULT) {
400+
this.enqueue([this.getBlobData, data, this._compress, opts, cb]);
374401
} else {
375-
this.dispatch(data, this._compress, opts, cb);
402+
this.getBlobData(data, this._compress, opts, cb);
376403
}
404+
} else if (this._state !== DEFAULT) {
405+
this.enqueue([this.dispatch, data, this._compress, opts, cb]);
377406
} else {
378-
this.sendFrame(
379-
Sender.frame(data, {
380-
[kByteLength]: byteLength,
381-
fin: options.fin,
382-
generateMask: this._generateMask,
383-
mask: options.mask,
384-
maskBuffer: this._maskBuffer,
385-
opcode,
386-
readOnly,
387-
rsv1: false
388-
}),
389-
cb
390-
);
407+
this.dispatch(data, this._compress, opts, cb);
391408
}
392409
}
393410

411+
/**
412+
* Calls queued callbacks with an error.
413+
*
414+
* @param {Error} err The error to call the callbacks with
415+
* @param {Function} [cb] The first callback
416+
* @private
417+
*/
418+
callCallbacks(err, cb) {
419+
if (typeof cb === 'function') cb(err);
420+
421+
for (let i = 0; i < this._queue.length; i++) {
422+
const params = this._queue[i];
423+
const callback = params[params.length - 1];
424+
425+
if (typeof callback === 'function') callback(err);
426+
}
427+
}
428+
429+
/**
430+
* Gets the contents of a blob as binary data.
431+
*
432+
* @param {Blob} blob The blob
433+
* @param {Boolean} [compress=false] Specifies whether or not to compress
434+
* the data
435+
* @param {Object} options Options object
436+
* @param {Boolean} [options.fin=false] Specifies whether or not to set the
437+
* FIN bit
438+
* @param {Function} [options.generateMask] The function used to generate the
439+
* masking key
440+
* @param {Boolean} [options.mask=false] Specifies whether or not to mask
441+
* `data`
442+
* @param {Buffer} [options.maskBuffer] The buffer used to store the masking
443+
* key
444+
* @param {Number} options.opcode The opcode
445+
* @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
446+
* modified
447+
* @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
448+
* RSV1 bit
449+
* @param {Function} [cb] Callback
450+
* @private
451+
*/
452+
getBlobData(blob, compress, options, cb) {
453+
this._bufferedBytes += options[kByteLength];
454+
this._state = GET_BLOB_DATA;
455+
456+
blob
457+
.arrayBuffer()
458+
.then((arrayBuffer) => {
459+
if (this._socket.destroyed) {
460+
const err = new Error(
461+
'The socket was closed while the blob was being read'
462+
);
463+
464+
this.callCallbacks(err, cb);
465+
return;
466+
}
467+
468+
this._bufferedBytes -= options[kByteLength];
469+
const data = toBuffer(arrayBuffer);
470+
471+
if (!compress) {
472+
this._state = DEFAULT;
473+
this.sendFrame(Sender.frame(data, options), cb);
474+
this.dequeue();
475+
} else {
476+
this.dispatch(data, compress, options, cb);
477+
}
478+
})
479+
.catch((err) => {
480+
//
481+
// `onError` is called in the next tick to not suppress the throwing
482+
// behavior of the `'error'` event emitted by the `WebSocket` object.
483+
//
484+
process.nextTick(onError, this, err, cb);
485+
});
486+
}
487+
394488
/**
395489
* Dispatches a message.
396490
*
@@ -423,27 +517,19 @@ class Sender {
423517
const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
424518

425519
this._bufferedBytes += options[kByteLength];
426-
this._deflating = true;
520+
this._state = DEFLATING;
427521
perMessageDeflate.compress(data, options.fin, (_, buf) => {
428522
if (this._socket.destroyed) {
429523
const err = new Error(
430524
'The socket was closed while data was being compressed'
431525
);
432526

433-
if (typeof cb === 'function') cb(err);
434-
435-
for (let i = 0; i < this._queue.length; i++) {
436-
const params = this._queue[i];
437-
const callback = params[params.length - 1];
438-
439-
if (typeof callback === 'function') callback(err);
440-
}
441-
527+
this.callCallbacks(err, cb);
442528
return;
443529
}
444530

445531
this._bufferedBytes -= options[kByteLength];
446-
this._deflating = false;
532+
this._state = DEFAULT;
447533
options.readOnly = false;
448534
this.sendFrame(Sender.frame(buf, options), cb);
449535
this.dequeue();
@@ -456,7 +542,7 @@ class Sender {
456542
* @private
457543
*/
458544
dequeue() {
459-
while (!this._deflating && this._queue.length) {
545+
while (this._state === DEFAULT && this._queue.length) {
460546
const params = this._queue.shift();
461547

462548
this._bufferedBytes -= params[3][kByteLength];
@@ -495,3 +581,16 @@ class Sender {
495581
}
496582

497583
module.exports = Sender;
584+
585+
/**
586+
* Handles a `Sender` error.
587+
*
588+
* @param {Sender} sender The `Sender` instance
589+
* @param {Error} err The error
590+
* @param {Function} [cb] The first pending callback
591+
* @private
592+
*/
593+
function onError(sender, err, cb) {
594+
sender.callCallbacks(err, cb);
595+
sender.onerror(err);
596+
}

0 commit comments

Comments
 (0)