Skip to content

Commit 4381dd3

Browse files
committed
stream: refactor to use more primordials
1 parent b9dfda9 commit 4381dd3

File tree

13 files changed

+118
-90
lines changed

13 files changed

+118
-90
lines changed

lib/internal/streams/buffer_list.js

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
'use strict';
22

33
const {
4+
StringPrototypeSlice,
45
SymbolIterator,
56
Uint8Array,
7+
TypedArrayPrototypeSet,
8+
TypedArrayPrototypeSlice,
69
} = primordials;
710

811
const { Buffer } = require('buffer');
@@ -67,7 +70,7 @@ module.exports = class BufferList {
6770
let p = this.head;
6871
let i = 0;
6972
while (p) {
70-
ret.set(p.data, i);
73+
TypedArrayPrototypeSet(ret, p.data, i);
7174
i += p.data.length;
7275
p = p.next;
7376
}
@@ -78,10 +81,11 @@ module.exports = class BufferList {
7881
consume(n, hasStrings) {
7982
const data = this.head.data;
8083
if (n < data.length) {
81-
// `slice` is the same for buffers and strings.
82-
const slice = data.slice(0, n);
83-
this.head.data = data.slice(n);
84-
return slice;
84+
const slice = typeof data === 'string' ?
85+
StringPrototypeSlice :
86+
TypedArrayPrototypeSlice;
87+
this.head.data = slice(data, n);
88+
return slice(data, 0, n);
8589
}
8690
if (n === data.length) {
8791
// First chunk is a perfect match.
@@ -120,9 +124,9 @@ module.exports = class BufferList {
120124
else
121125
this.head = this.tail = null;
122126
} else {
123-
ret += str.slice(0, n);
127+
ret += StringPrototypeSlice(str, 0, n);
124128
this.head = p;
125-
p.data = str.slice(n);
129+
p.data = StringPrototypeSlice(str, n);
126130
}
127131
break;
128132
}
@@ -141,20 +145,22 @@ module.exports = class BufferList {
141145
do {
142146
const buf = p.data;
143147
if (n > buf.length) {
144-
ret.set(buf, retLen - n);
148+
TypedArrayPrototypeSet(ret, buf, retLen - n);
145149
n -= buf.length;
146150
} else {
147151
if (n === buf.length) {
148-
ret.set(buf, retLen - n);
152+
TypedArrayPrototypeSet(ret, buf, retLen - n);
149153
++c;
150154
if (p.next)
151155
this.head = p.next;
152156
else
153157
this.head = this.tail = null;
154158
} else {
155-
ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n);
159+
TypedArrayPrototypeSet(ret,
160+
new Uint8Array(buf.buffer, buf.byteOffset, n),
161+
retLen - n);
156162
this.head = p;
157-
p.data = buf.slice(n);
163+
p.data = TypedArrayPrototypeSlice(buf, n);
158164
}
159165
break;
160166
}

lib/internal/streams/destroy.js

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
const {
44
ERR_MULTIPLE_CALLBACK
55
} = require('internal/errors').codes;
6-
const { Symbol } = primordials;
6+
const {
7+
FunctionPrototypeCall,
8+
Symbol,
9+
} = primordials;
710

811
const kDestroy = Symbol('kDestroy');
912
const kConstruct = Symbol('kConstruct');
@@ -93,7 +96,8 @@ function _destroy(self, err, cb) {
9396
try {
9497
const then = result.then;
9598
if (typeof then === 'function') {
96-
then.call(
99+
FunctionPrototypeCall(
100+
then,
97101
result,
98102
function() {
99103
if (called)
@@ -311,7 +315,8 @@ function constructNT(stream) {
311315
try {
312316
const then = result.then;
313317
if (typeof then === 'function') {
314-
then.call(
318+
FunctionPrototypeCall(
319+
then,
315320
result,
316321
function() {
317322
// If the callback was invoked, do nothing further.

lib/internal/streams/duplex.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
'use strict';
2828

2929
const {
30+
FunctionPrototypeCall,
3031
ObjectDefineProperties,
3132
ObjectGetOwnPropertyDescriptor,
3233
ObjectKeys,
@@ -53,8 +54,8 @@ function Duplex(options) {
5354
if (!(this instanceof Duplex))
5455
return new Duplex(options);
5556

56-
Readable.call(this, options);
57-
Writable.call(this, options);
57+
FunctionPrototypeCall(Readable, this, options);
58+
FunctionPrototypeCall(Writable, this, options);
5859
this.allowHalfOpen = true;
5960

6061
if (options) {

lib/internal/streams/end-of-stream.js

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33

44
'use strict';
55

6+
const {
7+
FunctionPrototype,
8+
FunctionPrototypeCall,
9+
} = primordials;
610
const {
711
ERR_STREAM_PREMATURE_CLOSE
812
} = require('internal/errors').codes;
@@ -53,7 +57,7 @@ function isWritableFinished(stream) {
5357
return wState.finished || (wState.ended && wState.length === 0);
5458
}
5559

56-
function nop() {}
60+
const nop = FunctionPrototype;
5761

5862
function isReadableEnded(stream) {
5963
if (stream.readableEnded) return true;
@@ -110,7 +114,7 @@ function eos(stream, options, callback) {
110114
if (stream.destroyed) willEmitClose = false;
111115

112116
if (willEmitClose && (!stream.readable || readable)) return;
113-
if (!readable || readableEnded) callback.call(stream);
117+
if (!readable || readableEnded) FunctionPrototypeCall(callback, stream);
114118
};
115119

116120
let readableEnded = stream.readableEnded ||
@@ -123,23 +127,25 @@ function eos(stream, options, callback) {
123127
if (stream.destroyed) willEmitClose = false;
124128

125129
if (willEmitClose && (!stream.writable || writable)) return;
126-
if (!writable || writableFinished) callback.call(stream);
130+
if (!writable || writableFinished) FunctionPrototypeCall(callback, stream);
127131
};
128132

129133
const onerror = (err) => {
130-
callback.call(stream, err);
134+
FunctionPrototypeCall(callback, stream, err);
131135
};
132136

133137
const onclose = () => {
134138
if (readable && !readableEnded) {
135139
if (!isReadableEnded(stream))
136-
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
140+
return FunctionPrototypeCall(callback, stream,
141+
new ERR_STREAM_PREMATURE_CLOSE());
137142
}
138143
if (writable && !writableFinished) {
139144
if (!isWritableFinished(stream))
140-
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
145+
return FunctionPrototypeCall(callback, stream,
146+
new ERR_STREAM_PREMATURE_CLOSE());
141147
}
142-
callback.call(stream);
148+
FunctionPrototypeCall(callback, stream);
143149
};
144150

145151
const onrequest = () => {

lib/internal/streams/from.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

33
const {
4+
PromisePrototypeThen,
45
SymbolAsyncIterator,
56
SymbolIterator
67
} = primordials;
@@ -55,7 +56,8 @@ function from(Readable, iterable, opts) {
5556
readable._destroy = function(error, cb) {
5657
if (needToClose) {
5758
needToClose = false;
58-
close().then(
59+
PromisePrototypeThen(
60+
close(),
5961
() => process.nextTick(cb, error),
6062
(e) => process.nextTick(cb, error || e),
6163
);

lib/internal/streams/lazy_transform.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const {
77
ObjectDefineProperties,
88
ObjectDefineProperty,
99
ObjectSetPrototypeOf,
10+
ReflectApply,
1011
} = primordials;
1112

1213
const stream = require('stream');
@@ -25,7 +26,7 @@ ObjectSetPrototypeOf(LazyTransform, stream.Transform);
2526

2627
function makeGetter(name) {
2728
return function() {
28-
stream.Transform.call(this, this._options);
29+
ReflectApply(stream.Transform, this, [this._options]);
2930
this._writableState.decodeStrings = false;
3031

3132
if (!this._options || !this._options.defaultEncoding) {

lib/internal/streams/legacy.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22

33
const {
44
ArrayIsArray,
5+
ArrayPrototypeUnshift,
6+
FunctionPrototypeCall,
57
ObjectSetPrototypeOf,
68
} = primordials;
79

810
const EE = require('events');
911

1012
function Stream(opts) {
11-
EE.call(this, opts);
13+
FunctionPrototypeCall(EE, this, opts);
1214
}
1315
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
1416
ObjectSetPrototypeOf(Stream, EE);
@@ -106,7 +108,7 @@ function prependListener(emitter, event, fn) {
106108
if (!emitter._events || !emitter._events[event])
107109
emitter.on(event, fn);
108110
else if (ArrayIsArray(emitter._events[event]))
109-
emitter._events[event].unshift(fn);
111+
ArrayPrototypeUnshift(emitter._events[event], fn);
110112
else
111113
emitter._events[event] = [fn, emitter._events[event]];
112114
}

lib/internal/streams/passthrough.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
'use strict';
2727

2828
const {
29+
FunctionPrototypeCall,
2930
ObjectSetPrototypeOf,
3031
} = primordials;
3132

@@ -39,7 +40,7 @@ function PassThrough(options) {
3940
if (!(this instanceof PassThrough))
4041
return new PassThrough(options);
4142

42-
Transform.call(this, options);
43+
FunctionPrototypeCall(Transform, this, options);
4344
}
4445

4546
PassThrough.prototype._transform = function(chunk, encoding, cb) {

lib/internal/streams/pipeline.js

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55

66
const {
77
ArrayIsArray,
8+
ArrayPrototypePop,
9+
ArrayPrototypePush,
10+
ArrayPrototypeShift,
11+
FunctionPrototypeCall,
812
ReflectApply,
913
SymbolAsyncIterator,
1014
SymbolIterator,
@@ -75,7 +79,7 @@ function popCallback(streams) {
7579
// a single stream. Therefore optimize for the average case instead of
7680
// checking for length === 0 as well.
7781
validateCallback(streams[streams.length - 1]);
78-
return streams.pop();
82+
return ArrayPrototypePop(streams);
7983
}
8084

8185
function isReadable(obj) {
@@ -114,7 +118,7 @@ async function* fromReadable(val) {
114118
Readable = require('internal/streams/readable');
115119
}
116120

117-
yield* Readable.prototype[SymbolAsyncIterator].call(val);
121+
yield* FunctionPrototypeCall(Readable.prototype[SymbolAsyncIterator], val);
118122
}
119123

120124
async function pump(iterable, writable, finish) {
@@ -171,7 +175,7 @@ function pipeline(...streams) {
171175
}
172176

173177
while (destroys.length) {
174-
destroys.shift()(error);
178+
ArrayPrototypeShift(destroys)(error);
175179
}
176180

177181
if (final) {
@@ -187,7 +191,7 @@ function pipeline(...streams) {
187191

188192
if (isStream(stream)) {
189193
finishCount++;
190-
destroys.push(destroyer(stream, reading, writing, finish));
194+
ArrayPrototypePush(destroys, destroyer(stream, reading, writing, finish));
191195
}
192196

193197
if (i === 0) {
@@ -250,7 +254,7 @@ function pipeline(...streams) {
250254
ret = pt;
251255

252256
finishCount++;
253-
destroys.push(destroyer(ret, false, true, finish));
257+
ArrayPrototypePush(destroys, destroyer(ret, false, true, finish));
254258
}
255259
} else if (isStream(stream)) {
256260
if (isReadable(ret)) {

0 commit comments

Comments
 (0)