Skip to content

Commit

Permalink
stream: add webstreams to duplex
Browse files Browse the repository at this point in the history
  • Loading branch information
debadree25 committed Jan 7, 2023
1 parent 6763932 commit 4937598
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 15 deletions.
38 changes: 23 additions & 15 deletions lib/internal/streams/duplexify.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const {
const { destroyer } = require('internal/streams/destroy');
const Duplex = require('internal/streams/duplex');
const Readable = require('internal/streams/readable');
const Writable = require('internal/streams/writable');
const { createDeferredPromise } = require('internal/util');
const from = require('internal/streams/from');

Expand All @@ -32,6 +33,16 @@ const {
FunctionPrototypeCall
} = primordials;

const {
isBrandCheck,
} = require('internal/webstreams/util');
const console = require('console');

const isReadableStream =
isBrandCheck('ReadableStream');
const isWritableStream =
isBrandCheck('WritableStream');

// This is needed for pre node 17.
class Duplexify extends Duplex {
constructor(options) {
Expand Down Expand Up @@ -71,15 +82,13 @@ module.exports = function duplexify(body, name) {
return _duplexify({ writable: false, readable: false });
}

// TODO: Webstreams
// if (isReadableStream(body)) {
// return _duplexify({ readable: Readable.fromWeb(body) });
// }
if (isReadableStream(body)) {
return _duplexify({ readable: Readable.fromWeb(body) });
}

// TODO: Webstreams
// if (isWritableStream(body)) {
// return _duplexify({ writable: Writable.fromWeb(body) });
// }
if (isWritableStream(body)) {
return _duplexify({ writable: Writable.fromWeb(body) });
}

if (typeof body === 'function') {
const { value, write, final, destroy } = fromAsyncGen(body);
Expand Down Expand Up @@ -146,13 +155,12 @@ module.exports = function duplexify(body, name) {
});
}

// TODO: Webstreams.
// if (
// isReadableStream(body?.readable) &&
// isWritableStream(body?.writable)
// ) {
// return Duplexify.fromWeb(body);
// }
if (
isReadableStream(body?.readable) &&
isWritableStream(body?.writable)
) {
return Duplexify.fromWeb(body);
}

if (
typeof body?.writable === 'object' ||
Expand Down
128 changes: 128 additions & 0 deletions test/parallel/test-stream-duplex-from.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const common = require('../common');
const assert = require('assert');
const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream');
const { ReadableStream, WritableStream } = require('stream/web');
const { Blob } = require('buffer');

{
Expand Down Expand Up @@ -299,3 +300,130 @@ const { Blob } = require('buffer');
assert.strictEqual(res, 'foobar');
})).on('close', common.mustCall());
}

{
const d = Duplex.from({
readable: new ReadableStream({
start(controller) {
controller.enqueue('foo');
controller.close();
},
}),
});
assert.strictEqual(d.readable, true);
assert.strictEqual(d.writable, false);
d.once(
'readable',
common.mustCall(() => {
assert.strictEqual(d.read().toString(), 'foo');
})
);
// assert.strictEqual(d.readable, false);
}

{
const d = Duplex.from(
new ReadableStream({
start(controller) {
controller.enqueue('foo');
controller.close();
},
})
);
assert.strictEqual(d.readable, true);
assert.strictEqual(d.writable, false);
d.once(
'readable',
common.mustCall(() => {
assert.strictEqual(d.read().toString(), 'foo');
})
);
// d.once(
// 'end',
// common.mustCall(() => {
// assert.strictEqual(d.readable, false);
// })
// );
}

{
let str = '';
const d = Duplex.from({
writable: new WritableStream({
write(chunk) {
str += chunk;
},
}),
});
assert.strictEqual(d.readable, false);
assert.strictEqual(d.writable, true);
d.end('foo');
d.on(
'finish',
common.mustCall(() => {
assert.strictEqual(d.writable, false);
assert.strictEqual(str, 'foo');
})
);
}

{
let str = '';
const d = Duplex.from(
new WritableStream({
write(chunk) {
str += chunk;
},
})
);
assert.strictEqual(d.readable, false);
assert.strictEqual(d.writable, true);
d.end('foo');
d.on(
'finish',
common.mustCall(() => {
assert.strictEqual(d.writable, false);
assert.strictEqual(str, 'foo');
})
);
}

{
let ret = '';
const d = Duplex.from({
readable: new Readable({
read() {
this.push('foo');
this.push(null);
},
}),
writable: new Writable({
write(chunk, encoding, callback) {
ret += chunk;
callback();
},
}),
});
assert.strictEqual(d.readable, true);
assert.strictEqual(d.writable, true);
d.once(
'readable',
common.mustCall(function () {
assert.strictEqual(d.read().toString(), 'foo');
})
);
d.once(
'end',
common.mustCall(function () {
assert.strictEqual(d.readable, false);
})
);
d.end('asd');
d.once(
'finish',
common.mustCall(function () {
assert.strictEqual(d.writable, false);
assert.strictEqual(ret, 'asd');
})
);
}

0 comments on commit 4937598

Please sign in to comment.