-
-
Notifications
You must be signed in to change notification settings - Fork 31.8k
stream: enable usage of webstreams on compose() #46675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
505d884
a1cca78
bcd122a
62f5372
20fd566
4e1d499
f3c6214
8ce94be
838f5ef
183f475
233115d
002eed4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,10 @@ const { | |
isNodeStream, | ||
isReadable, | ||
isWritable, | ||
isWebStream, | ||
isTransformStream, | ||
isWritableStream, | ||
isReadableStream, | ||
} = require('internal/streams/utils'); | ||
const { | ||
AbortError, | ||
|
@@ -15,6 +19,7 @@ const { | |
ERR_MISSING_ARGS, | ||
}, | ||
} = require('internal/errors'); | ||
const eos = require('internal/streams/end-of-stream'); | ||
|
||
module.exports = function compose(...streams) { | ||
if (streams.length === 0) { | ||
|
@@ -37,18 +42,32 @@ module.exports = function compose(...streams) { | |
} | ||
|
||
for (let n = 0; n < streams.length; ++n) { | ||
if (!isNodeStream(streams[n])) { | ||
if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) { | ||
// TODO(ronag): Add checks for non streams. | ||
continue; | ||
} | ||
if (n < streams.length - 1 && !isReadable(streams[n])) { | ||
if ( | ||
n < streams.length - 1 && | ||
!( | ||
isReadable(streams[n]) || | ||
isReadableStream(streams[n]) || | ||
isTransformStream(streams[n]) | ||
) | ||
) { | ||
throw new ERR_INVALID_ARG_VALUE( | ||
`streams[${n}]`, | ||
orgStreams[n], | ||
'must be readable' | ||
); | ||
} | ||
if (n > 0 && !isWritable(streams[n])) { | ||
if ( | ||
n > 0 && | ||
!( | ||
isWritable(streams[n]) || | ||
isWritableStream(streams[n]) || | ||
isTransformStream(streams[n]) | ||
) | ||
) { | ||
throw new ERR_INVALID_ARG_VALUE( | ||
`streams[${n}]`, | ||
orgStreams[n], | ||
|
@@ -79,8 +98,16 @@ module.exports = function compose(...streams) { | |
const head = streams[0]; | ||
const tail = pipeline(streams, onfinished); | ||
|
||
const writable = !!isWritable(head); | ||
const readable = !!isReadable(tail); | ||
const writable = !!( | ||
isWritable(head) || | ||
isWritableStream(head) || | ||
isTransformStream(head) | ||
); | ||
const readable = !!( | ||
isReadable(tail) || | ||
isReadableStream(tail) || | ||
isTransformStream(tail) | ||
); | ||
|
||
// TODO(ronag): Avoid double buffering. | ||
// Implement Writable/Readable/Duplex traits. | ||
|
@@ -94,28 +121,55 @@ module.exports = function compose(...streams) { | |
}); | ||
|
||
if (writable) { | ||
d._write = function(chunk, encoding, callback) { | ||
if (head.write(chunk, encoding)) { | ||
callback(); | ||
} else { | ||
ondrain = callback; | ||
} | ||
}; | ||
|
||
d._final = function(callback) { | ||
head.end(); | ||
onfinish = callback; | ||
}; | ||
if (isNodeStream(head)) { | ||
d._write = function(chunk, encoding, callback) { | ||
if (head.write(chunk, encoding)) { | ||
callback(); | ||
} else { | ||
ondrain = callback; | ||
} | ||
}; | ||
|
||
d._final = function(callback) { | ||
head.end(); | ||
onfinish = callback; | ||
}; | ||
|
||
head.on('drain', function() { | ||
if (ondrain) { | ||
const cb = ondrain; | ||
ondrain = null; | ||
cb(); | ||
} | ||
}); | ||
} else if (isWebStream(head)) { | ||
const writable = isTransformStream(head) ? head.writable : head; | ||
const writer = writable.getWriter(); | ||
|
||
d._write = async function(chunk, encoding, callback) { | ||
try { | ||
await writer.ready; | ||
writer.write(chunk).catch(() => {}); | ||
callback(); | ||
} catch (err) { | ||
callback(err); | ||
} | ||
}; | ||
|
||
d._final = async function(callback) { | ||
try { | ||
await writer.ready; | ||
writer.close().catch(() => {}); | ||
onfinish = callback; | ||
} catch (err) { | ||
callback(err); | ||
} | ||
}; | ||
} | ||
|
||
head.on('drain', function() { | ||
if (ondrain) { | ||
const cb = ondrain; | ||
ondrain = null; | ||
cb(); | ||
} | ||
}); | ||
const toRead = isTransformStream(tail) ? tail.readable : tail; | ||
|
||
tail.on('finish', function() { | ||
eos(toRead, () => { | ||
if (onfinish) { | ||
const cb = onfinish; | ||
onfinish = null; | ||
|
@@ -125,32 +179,54 @@ module.exports = function compose(...streams) { | |
} | ||
|
||
if (readable) { | ||
tail.on('readable', function() { | ||
if (onreadable) { | ||
const cb = onreadable; | ||
onreadable = null; | ||
cb(); | ||
} | ||
}); | ||
|
||
tail.on('end', function() { | ||
d.push(null); | ||
}); | ||
|
||
d._read = function() { | ||
while (true) { | ||
const buf = tail.read(); | ||
|
||
if (buf === null) { | ||
onreadable = d._read; | ||
return; | ||
if (isNodeStream(tail)) { | ||
tail.on('readable', function() { | ||
if (onreadable) { | ||
const cb = onreadable; | ||
onreadable = null; | ||
cb(); | ||
} | ||
|
||
if (!d.push(buf)) { | ||
return; | ||
}); | ||
|
||
tail.on('end', function() { | ||
d.push(null); | ||
}); | ||
|
||
d._read = function() { | ||
while (true) { | ||
const buf = tail.read(); | ||
if (buf === null) { | ||
onreadable = d._read; | ||
return; | ||
} | ||
|
||
if (!d.push(buf)) { | ||
return; | ||
} | ||
} | ||
} | ||
}; | ||
}; | ||
} else if (isWebStream(tail)) { | ||
const readable = isTransformStream(tail) ? tail.readable : tail; | ||
const reader = readable.getReader(); | ||
d._read = async function() { | ||
while (true) { | ||
try { | ||
const { value, done } = await reader.read(); | ||
|
||
if (!d.push(value)) { | ||
return; | ||
} | ||
|
||
if (done) { | ||
d.push(null); | ||
return; | ||
} | ||
} catch { | ||
return; | ||
} | ||
} | ||
}; | ||
} | ||
} | ||
|
||
d._destroy = function(err, callback) { | ||
|
@@ -166,7 +242,9 @@ module.exports = function compose(...streams) { | |
callback(err); | ||
} else { | ||
onclose = callback; | ||
destroyer(tail, err); | ||
if (isNodeStream(tail)) { | ||
destroyer(tail, err); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some help is needed here, how could we destroy webstreams here? or should we even? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you elaborate on the question? (We can destroy web streams the question is what scenario do you specifically mean) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the pipeline encounters an error, it would call d.destroy, which in turn would destroy the last stream in the series the Actually, i am a little confused why destroying the last stream is necessary 😅 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one question remain wdyt @benjamingr ? |
||
} | ||
} | ||
}; | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.