Skip to content

Commit

Permalink
stream: improve WebStreams performance
Browse files Browse the repository at this point in the history
PR-URL: nodejs#49089
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
rluvaton authored Aug 13, 2023
1 parent ee8b7f1 commit 9fc5700
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 64 deletions.
66 changes: 25 additions & 41 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ const kChunk = Symbol('kChunk');
const kError = Symbol('kError');
const kPull = Symbol('kPull');
const kRelease = Symbol('kRelease');
const kSkipThrow = Symbol('kSkipThrow');

let releasedError;
let releasingError;
Expand Down Expand Up @@ -675,8 +676,10 @@ TransferredReadableStream.prototype[kDeserialize] = () => {};
class ReadableStreamBYOBRequest {
[kType] = 'ReadableStreamBYOBRequest';

constructor() {
throw new ERR_ILLEGAL_CONSTRUCTOR();
constructor(skipThrowSymbol = undefined) {
if (skipThrowSymbol !== kSkipThrow) {
throw new ERR_ILLEGAL_CONSTRUCTOR();
}
}

/**
Expand Down Expand Up @@ -758,17 +761,14 @@ ObjectDefineProperties(ReadableStreamBYOBRequest.prototype, {
});

function createReadableStreamBYOBRequest(controller, view) {
return ReflectConstruct(
function() {
this[kType] = 'ReadableStreamBYOBRequest';
this[kState] = {
controller,
view,
};
},
[],
ReadableStreamBYOBRequest,
);
const stream = new ReadableStreamBYOBRequest(kSkipThrow);

stream[kState] = {
controller,
view,
};

return stream;
}

class DefaultReadRequest {
Expand Down Expand Up @@ -1018,9 +1018,12 @@ ObjectDefineProperties(ReadableStreamBYOBReader.prototype, {

class ReadableStreamDefaultController {
[kType] = 'ReadableStreamDefaultController';
[kState] = {};

constructor() {
throw new ERR_ILLEGAL_CONSTRUCTOR();
constructor(skipThrowSymbol = undefined) {
if (skipThrowSymbol !== kSkipThrow) {
throw new ERR_ILLEGAL_CONSTRUCTOR();
}
}

/**
Expand Down Expand Up @@ -1076,22 +1079,14 @@ ObjectDefineProperties(ReadableStreamDefaultController.prototype, {
[SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableStreamDefaultController.name),
});

function createReadableStreamDefaultController() {
return ReflectConstruct(
function() {
this[kType] = 'ReadableStreamDefaultController';
this[kState] = {};
},
[],
ReadableStreamDefaultController,
);
}

class ReadableByteStreamController {
[kType] = 'ReadableByteStreamController';
[kState] = {};

constructor() {
throw new ERR_ILLEGAL_CONSTRUCTOR();
constructor(skipThrowSymbol = undefined) {
if (skipThrowSymbol !== kSkipThrow) {
throw new ERR_ILLEGAL_CONSTRUCTOR();
}
}

/**
Expand Down Expand Up @@ -1202,17 +1197,6 @@ ObjectDefineProperties(ReadableByteStreamController.prototype, {
[SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableByteStreamController.name),
});

function createReadableByteStreamController() {
return ReflectConstruct(
function() {
this[kType] = 'ReadableByteStreamController';
this[kState] = {};
},
[],
ReadableByteStreamController,
);
}

function createTeeReadableStream(start, pull, cancel) {
return ReflectConstruct(
function() {
Expand Down Expand Up @@ -2415,7 +2399,7 @@ function setupReadableStreamDefaultControllerFromSource(
source,
highWaterMark,
sizeAlgorithm) {
const controller = createReadableStreamDefaultController();
const controller = new ReadableStreamDefaultController(kSkipThrow);
const start = source?.start;
const pull = source?.pull;
const cancel = source?.cancel;
Expand Down Expand Up @@ -3213,7 +3197,7 @@ function setupReadableByteStreamControllerFromSource(
stream,
source,
highWaterMark) {
const controller = createReadableByteStreamController();
const controller = new ReadableByteStreamController(kSkipThrow);
const start = source?.start;
const pull = source?.pull;
const cancel = source?.cancel;
Expand Down
20 changes: 8 additions & 12 deletions lib/internal/webstreams/transformstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const {
PromiseResolve,
ReflectConstruct,
SymbolToStringTag,
Symbol,
} = primordials;

const {
Expand Down Expand Up @@ -65,6 +66,8 @@ const {

const assert = require('internal/assert');

const kSkipThrow = Symbol('kSkipThrow');

const getNonWritablePropertyDescriptor = (value) => {
return {
__proto__: null,
Expand Down Expand Up @@ -268,8 +271,10 @@ TransferredTransformStream.prototype[kDeserialize] = () => {};
class TransformStreamDefaultController {
[kType] = 'TransformStreamDefaultController';

constructor() {
throw new ERR_ILLEGAL_CONSTRUCTOR();
constructor(skipThrowSymbol = undefined) {
if (skipThrowSymbol !== kSkipThrow) {
throw new ERR_ILLEGAL_CONSTRUCTOR();
}
}

/**
Expand Down Expand Up @@ -330,15 +335,6 @@ ObjectDefineProperties(TransformStreamDefaultController.prototype, {
[SymbolToStringTag]: getNonWritablePropertyDescriptor(TransformStreamDefaultController.name),
});

function createTransformStreamDefaultController() {
return ReflectConstruct(
function() {
this[kType] = 'TransformStreamDefaultController';
},
[],
TransformStreamDefaultController);
}

const isTransformStream =
isBrandCheck('TransformStream');
const isTransformStreamDefaultController =
Expand Down Expand Up @@ -453,7 +449,7 @@ function setupTransformStreamDefaultController(
function setupTransformStreamDefaultControllerFromTransformer(
stream,
transformer) {
const controller = createTransformStreamDefaultController();
const controller = new TransformStreamDefaultController(kSkipThrow);
const transform = transformer?.transform || defaultTransformAlgorithm;
const flush = transformer?.flush || nonOpFlush;
const transformAlgorithm =
Expand Down
17 changes: 6 additions & 11 deletions lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const assert = require('internal/assert');
const kAbort = Symbol('kAbort');
const kCloseSentinel = Symbol('kCloseSentinel');
const kError = Symbol('kError');
const kSkipThrow = Symbol('kSkipThrow');

let releasedError;

Expand Down Expand Up @@ -522,8 +523,10 @@ ObjectDefineProperties(WritableStreamDefaultWriter.prototype, {
class WritableStreamDefaultController {
[kType] = 'WritableStreamDefaultController';

constructor() {
throw new ERR_ILLEGAL_CONSTRUCTOR();
constructor(skipThrowSymbol = undefined) {
if (skipThrowSymbol !== kSkipThrow) {
throw new ERR_ILLEGAL_CONSTRUCTOR();
}
}

[kAbort](reason) {
Expand Down Expand Up @@ -569,14 +572,6 @@ ObjectDefineProperties(WritableStreamDefaultController.prototype, {
[SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStreamDefaultController.name),
});

function createWritableStreamDefaultController() {
return ReflectConstruct(
function() {
this[kType] = 'WritableStreamDefaultController';
},
[], WritableStreamDefaultController);
}

const isWritableStream =
isBrandCheck('WritableStream');
const isWritableStreamDefaultWriter =
Expand Down Expand Up @@ -1233,7 +1228,7 @@ function setupWritableStreamDefaultControllerFromSink(
sink,
highWaterMark,
sizeAlgorithm) {
const controller = createWritableStreamDefaultController();
const controller = new WritableStreamDefaultController(kSkipThrow);
const start = sink?.start;
const write = sink?.write;
const close = sink?.close;
Expand Down

0 comments on commit 9fc5700

Please sign in to comment.