Skip to content

Commit 223c6fc

Browse files
committed
[feature] Add support for Blob
Closes #2206
1 parent ddfe4a8 commit 223c6fc

File tree

10 files changed

+761
-93
lines changed

10 files changed

+761
-93
lines changed

doc/ws.md

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -466,10 +466,11 @@ does nothing if `type` is not one of `'close'`, `'error'`, `'message'`, or
466466
- {String}
467467

468468
A string indicating the type of binary data being transmitted by the connection.
469-
This should be one of "nodebuffer", "arraybuffer" or "fragments". Defaults to
470-
"nodebuffer". Type "fragments" will emit the array of fragments as received from
471-
the sender, without copyfull concatenation, which is useful for the performance
472-
of binary protocols transferring large messages with multiple fragments.
469+
This should be one of "nodebuffer", "arraybuffer", "blob", or "fragments".
470+
Defaults to "nodebuffer". Type "fragments" will emit the array of fragments as
471+
received from the sender, without copyfull concatenation, which is useful for
472+
the performance of binary protocols transferring large messages with multiple
473+
fragments.
473474

474475
### websocket.bufferedAmount
475476

@@ -538,7 +539,8 @@ is a noop if the ready state is `CONNECTING` or `CLOSED`.
538539

539540
### websocket.ping([data[, mask]][, callback])
540541

541-
- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The
542+
- `data`
543+
{Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The
542544
data to send in the ping frame.
543545
- `mask` {Boolean} Specifies whether `data` should be masked or not. Defaults to
544546
`true` when `websocket` is not a server client.
@@ -550,7 +552,8 @@ Send a ping. This method throws an error if the ready state is `CONNECTING`.
550552

551553
### websocket.pong([data[, mask]][, callback])
552554

553-
- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The
555+
- `data`
556+
{Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The
554557
data to send in the pong frame.
555558
- `mask` {Boolean} Specifies whether `data` should be masked or not. Defaults to
556559
`true` when `websocket` is not a server client.
@@ -588,7 +591,8 @@ only removes listeners added with
588591

589592
### websocket.send(data[, options][, callback])
590593

591-
- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The
594+
- `data`
595+
{Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray|Blob} The
592596
data to send. `Object` values are only supported if they conform to the
593597
requirements of [`Buffer.from()`][]. If those constraints are not met, a
594598
`TypeError` is thrown.

lib/constants.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
'use strict';
22

3+
const BINARY_TYPES = ['nodebuffer', 'arraybuffer', 'fragments'];
4+
const hasBlob = typeof Blob !== 'undefined';
5+
6+
if (hasBlob) BINARY_TYPES.push('blob');
7+
38
module.exports = {
4-
BINARY_TYPES: ['nodebuffer', 'arraybuffer', 'fragments'],
9+
BINARY_TYPES,
510
EMPTY_BUFFER: Buffer.alloc(0),
611
GUID: '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
12+
hasBlob,
713
kForOnEventAttribute: Symbol('kIsForOnEventAttribute'),
814
kListener: Symbol('kListener'),
915
kStatusCode: Symbol('status-code'),

lib/receiver.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,8 @@ class Receiver extends Writable {
559559
data = concat(fragments, messageLength);
560560
} else if (this._binaryType === 'arraybuffer') {
561561
data = toArrayBuffer(concat(fragments, messageLength));
562+
} else if (this._binaryType === 'blob') {
563+
data = new Blob(fragments);
562564
} else {
563565
data = fragments;
564566
}

lib/sender.js

Lines changed: 145 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ const { Duplex } = require('stream');
66
const { randomFillSync } = require('crypto');
77

88
const PerMessageDeflate = require('./permessage-deflate');
9-
const { EMPTY_BUFFER } = require('./constants');
10-
const { isValidStatusCode } = require('./validation');
9+
const { EMPTY_BUFFER, kWebSocket, NOOP } = require('./constants');
10+
const { isBlob, isValidStatusCode } = require('./validation');
1111
const { mask: applyMask, toBuffer } = require('./buffer-util');
1212

1313
const kByteLength = Symbol('kByteLength');
@@ -16,6 +16,10 @@ const RANDOM_POOL_SIZE = 8 * 1024;
1616
let randomPool;
1717
let randomPoolPointer = RANDOM_POOL_SIZE;
1818

19+
const DEFAULT = 0;
20+
const DEFLATING = 1;
21+
const GET_BLOB_DATA = 2;
22+
1923
/**
2024
* HyBi Sender implementation.
2125
*/
@@ -42,8 +46,10 @@ class Sender {
4246
this._compress = false;
4347

4448
this._bufferedBytes = 0;
45-
this._deflating = false;
4649
this._queue = [];
50+
this._state = DEFAULT;
51+
this.onerror = NOOP;
52+
this[kWebSocket] = undefined;
4753
}
4854

4955
/**
@@ -205,7 +211,7 @@ class Sender {
205211
rsv1: false
206212
};
207213

208-
if (this._deflating) {
214+
if (this._state !== DEFAULT) {
209215
this.enqueue([this.dispatch, buf, false, options, cb]);
210216
} else {
211217
this.sendFrame(Sender.frame(buf, options), cb);
@@ -227,6 +233,9 @@ class Sender {
227233
if (typeof data === 'string') {
228234
byteLength = Buffer.byteLength(data);
229235
readOnly = false;
236+
} else if (isBlob(data)) {
237+
byteLength = data.size;
238+
readOnly = false;
230239
} else {
231240
data = toBuffer(data);
232241
byteLength = data.length;
@@ -248,7 +257,13 @@ class Sender {
248257
rsv1: false
249258
};
250259

251-
if (this._deflating) {
260+
if (isBlob(data)) {
261+
if (this._state !== DEFAULT) {
262+
this.enqueue([this.getBlobData, data, false, options, cb]);
263+
} else {
264+
this.getBlobData(data, false, options, cb);
265+
}
266+
} else if (this._state !== DEFAULT) {
252267
this.enqueue([this.dispatch, data, false, options, cb]);
253268
} else {
254269
this.sendFrame(Sender.frame(data, options), cb);
@@ -270,6 +285,9 @@ class Sender {
270285
if (typeof data === 'string') {
271286
byteLength = Buffer.byteLength(data);
272287
readOnly = false;
288+
} else if (isBlob(data)) {
289+
byteLength = data.size;
290+
readOnly = false;
273291
} else {
274292
data = toBuffer(data);
275293
byteLength = data.length;
@@ -291,7 +309,13 @@ class Sender {
291309
rsv1: false
292310
};
293311

294-
if (this._deflating) {
312+
if (isBlob(data)) {
313+
if (this._state !== DEFAULT) {
314+
this.enqueue([this.getBlobData, data, false, options, cb]);
315+
} else {
316+
this.getBlobData(data, false, options, cb);
317+
}
318+
} else if (this._state !== DEFAULT) {
295319
this.enqueue([this.dispatch, data, false, options, cb]);
296320
} else {
297321
this.sendFrame(Sender.frame(data, options), cb);
@@ -325,6 +349,9 @@ class Sender {
325349
if (typeof data === 'string') {
326350
byteLength = Buffer.byteLength(data);
327351
readOnly = false;
352+
} else if (isBlob(data)) {
353+
byteLength = data.size;
354+
readOnly = false;
328355
} else {
329356
data = toBuffer(data);
330357
byteLength = data.length;
@@ -352,40 +379,107 @@ class Sender {
352379

353380
if (options.fin) this._firstFragment = true;
354381

355-
if (perMessageDeflate) {
356-
const opts = {
357-
[kByteLength]: byteLength,
358-
fin: options.fin,
359-
generateMask: this._generateMask,
360-
mask: options.mask,
361-
maskBuffer: this._maskBuffer,
362-
opcode,
363-
readOnly,
364-
rsv1
365-
};
366-
367-
if (this._deflating) {
368-
this.enqueue([this.dispatch, data, this._compress, opts, cb]);
382+
const opts = {
383+
[kByteLength]: byteLength,
384+
fin: options.fin,
385+
generateMask: this._generateMask,
386+
mask: options.mask,
387+
maskBuffer: this._maskBuffer,
388+
opcode,
389+
readOnly,
390+
rsv1
391+
};
392+
393+
if (isBlob(data)) {
394+
if (this._state !== DEFAULT) {
395+
this.enqueue([this.getBlobData, data, this._compress, opts, cb]);
369396
} else {
370-
this.dispatch(data, this._compress, opts, cb);
397+
this.getBlobData(data, this._compress, opts, cb);
371398
}
399+
} else if (this._state !== DEFAULT) {
400+
this.enqueue([this.dispatch, data, this._compress, opts, cb]);
372401
} else {
373-
this.sendFrame(
374-
Sender.frame(data, {
375-
[kByteLength]: byteLength,
376-
fin: options.fin,
377-
generateMask: this._generateMask,
378-
mask: options.mask,
379-
maskBuffer: this._maskBuffer,
380-
opcode,
381-
readOnly,
382-
rsv1: false
383-
}),
384-
cb
385-
);
402+
this.dispatch(data, this._compress, opts, cb);
386403
}
387404
}
388405

406+
/**
407+
* Calls queued callbacks with an error.
408+
*
409+
* @param {Error} err The error to call the callbacks with
410+
* @param {Function} [cb] The first callback
411+
* @private
412+
*/
413+
callCallbacks(err, cb) {
414+
if (typeof cb === 'function') cb(err);
415+
416+
for (let i = 0; i < this._queue.length; i++) {
417+
const params = this._queue[i];
418+
const callback = params[params.length - 1];
419+
420+
if (typeof callback === 'function') callback(err);
421+
}
422+
}
423+
424+
/**
425+
* Gets the contents of a blob as binary data.
426+
*
427+
* @param {Blob} blob The blob
428+
* @param {Boolean} [compress=false] Specifies whether or not to compress
429+
* the data
430+
* @param {Object} options Options object
431+
* @param {Boolean} [options.fin=false] Specifies whether or not to set the
432+
* FIN bit
433+
* @param {Function} [options.generateMask] The function used to generate the
434+
* masking key
435+
* @param {Boolean} [options.mask=false] Specifies whether or not to mask
436+
* `data`
437+
* @param {Buffer} [options.maskBuffer] The buffer used to store the masking
438+
* key
439+
* @param {Number} options.opcode The opcode
440+
* @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
441+
* modified
442+
* @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
443+
* RSV1 bit
444+
* @param {Function} [cb] Callback
445+
* @private
446+
*/
447+
getBlobData(blob, compress, options, cb) {
448+
this._bufferedBytes += options[kByteLength];
449+
this._state = GET_BLOB_DATA;
450+
451+
blob
452+
.arrayBuffer()
453+
.then((arrayBuffer) => {
454+
if (this._socket.destroyed) {
455+
const err = new Error(
456+
'The socket was closed while the blob was being read'
457+
);
458+
459+
this.callCallbacks(err, cb);
460+
return;
461+
}
462+
463+
this._bufferedBytes -= options[kByteLength];
464+
const data = toBuffer(arrayBuffer);
465+
466+
if (!compress) {
467+
this._state = DEFAULT;
468+
this.sendFrame(Sender.frame(data, options), cb);
469+
this.dequeue();
470+
} else {
471+
this.dispatch(data, compress, options, cb);
472+
}
473+
})
474+
.catch((err) => {
475+
//
476+
// `onError` is called in the next tick to not suppress the throwing
477+
// behavior of the `'error'` event emitted by the `WebSocket` object.
478+
//
479+
process.nextTick(onError, this, err, cb);
480+
});
481+
}
482+
389483
/**
390484
* Dispatches a message.
391485
*
@@ -418,27 +512,19 @@ class Sender {
418512
const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
419513

420514
this._bufferedBytes += options[kByteLength];
421-
this._deflating = true;
515+
this._state = DEFLATING;
422516
perMessageDeflate.compress(data, options.fin, (_, buf) => {
423517
if (this._socket.destroyed) {
424518
const err = new Error(
425519
'The socket was closed while data was being compressed'
426520
);
427521

428-
if (typeof cb === 'function') cb(err);
429-
430-
for (let i = 0; i < this._queue.length; i++) {
431-
const params = this._queue[i];
432-
const callback = params[params.length - 1];
433-
434-
if (typeof callback === 'function') callback(err);
435-
}
436-
522+
this.callCallbacks(err, cb);
437523
return;
438524
}
439525

440526
this._bufferedBytes -= options[kByteLength];
441-
this._deflating = false;
527+
this._state = DEFAULT;
442528
options.readOnly = false;
443529
this.sendFrame(Sender.frame(buf, options), cb);
444530
this.dequeue();
@@ -451,7 +537,7 @@ class Sender {
451537
* @private
452538
*/
453539
dequeue() {
454-
while (!this._deflating && this._queue.length) {
540+
while (this._state === DEFAULT && this._queue.length) {
455541
const params = this._queue.shift();
456542

457543
this._bufferedBytes -= params[3][kByteLength];
@@ -490,3 +576,16 @@ class Sender {
490576
}
491577

492578
module.exports = Sender;
579+
580+
/**
581+
* Handles a `Sender` error.
582+
*
583+
* @param {Sender} sender The `Sender` instance
584+
* @param {Error} err The error
585+
* @param {Function} [cb] The first pending callback
586+
* @private
587+
*/
588+
function onError(sender, err, cb) {
589+
sender.callCallbacks(err, cb);
590+
sender.onerror(err);
591+
}

0 commit comments

Comments
 (0)