|
2 | 2 |
|
3 | 3 | const pipeline = require('internal/streams/pipeline'); |
4 | 4 | const Duplex = require('internal/streams/duplex'); |
5 | | -const { createDeferredPromise } = require('internal/util'); |
6 | 5 | const { destroyer } = require('internal/streams/destroy'); |
7 | | -const from = require('internal/streams/from'); |
8 | 6 | const { |
9 | 7 | isNodeStream, |
10 | | - isIterable, |
11 | 8 | isReadable, |
12 | 9 | isWritable, |
13 | 10 | } = require('internal/streams/utils'); |
14 | | -const { |
15 | | - PromiseResolve, |
16 | | -} = primordials; |
17 | 11 | const { |
18 | 12 | AbortError, |
19 | 13 | codes: { |
20 | | - ERR_INVALID_ARG_TYPE, |
21 | 14 | ERR_INVALID_ARG_VALUE, |
22 | | - ERR_INVALID_RETURN_VALUE, |
23 | 15 | ERR_MISSING_ARGS, |
24 | 16 | }, |
25 | 17 | } = require('internal/errors'); |
26 | | -const assert = require('internal/assert'); |
27 | | - |
28 | | -// This is needed for pre node 17. |
29 | | -class ComposeDuplex extends Duplex { |
30 | | - constructor(options) { |
31 | | - super(options); |
32 | | - |
33 | | - // https://github.com/nodejs/node/pull/34385 |
34 | | - |
35 | | - if (options?.readable === false) { |
36 | | - this._readableState.readable = false; |
37 | | - this._readableState.ended = true; |
38 | | - this._readableState.endEmitted = true; |
39 | | - } |
40 | | - |
41 | | - if (options?.writable === false) { |
42 | | - this._writableState.writable = false; |
43 | | - this._writableState.ending = true; |
44 | | - this._writableState.ended = true; |
45 | | - this._writableState.finished = true; |
46 | | - } |
47 | | - } |
48 | | -} |
49 | 18 |
|
50 | 19 | module.exports = function compose(...streams) { |
51 | 20 | if (streams.length === 0) { |
52 | 21 | throw new ERR_MISSING_ARGS('streams'); |
53 | 22 | } |
54 | 23 |
|
55 | 24 | if (streams.length === 1) { |
56 | | - return makeDuplex(streams[0], 'streams[0]'); |
| 25 | + return Duplex.from(streams[0]); |
57 | 26 | } |
58 | 27 |
|
59 | 28 | const orgStreams = [...streams]; |
60 | 29 |
|
61 | 30 | if (typeof streams[0] === 'function') { |
62 | | - streams[0] = makeDuplex(streams[0], 'streams[0]'); |
| 31 | + streams[0] = Duplex.from(streams[0]); |
63 | 32 | } |
64 | 33 |
|
65 | 34 | if (typeof streams[streams.length - 1] === 'function') { |
66 | 35 | const idx = streams.length - 1; |
67 | | - streams[idx] = makeDuplex(streams[idx], `streams[${idx}]`); |
| 36 | + streams[idx] = Duplex.from(streams[idx]); |
68 | 37 | } |
69 | 38 |
|
70 | 39 | for (let n = 0; n < streams.length; ++n) { |
@@ -116,8 +85,8 @@ module.exports = function compose(...streams) { |
116 | 85 | // TODO(ronag): Avoid double buffering. |
117 | 86 | // Implement Writable/Readable/Duplex traits. |
118 | 87 | // See, https://github.com/nodejs/node/pull/33515. |
119 | | - d = new ComposeDuplex({ |
120 | | - highWaterMark: 1, |
| 88 | + d = new Duplex({ |
| 89 | + // TODO (ronag): highWaterMark? |
121 | 90 | writableObjectMode: !!head?.writableObjectMode, |
122 | 91 | readableObjectMode: !!tail?.writableObjectMode, |
123 | 92 | writable, |
@@ -203,82 +172,3 @@ module.exports = function compose(...streams) { |
203 | 172 |
|
204 | 173 | return d; |
205 | 174 | }; |
206 | | - |
207 | | -function makeDuplex(stream, name) { |
208 | | - let ret; |
209 | | - if (typeof stream === 'function') { |
210 | | - assert(stream.length > 0); |
211 | | - |
212 | | - const { value, write, final } = fromAsyncGen(stream); |
213 | | - |
214 | | - if (isIterable(value)) { |
215 | | - ret = from(ComposeDuplex, value, { |
216 | | - objectMode: true, |
217 | | - highWaterMark: 1, |
218 | | - write, |
219 | | - final |
220 | | - }); |
221 | | - } else if (typeof value?.then === 'function') { |
222 | | - const promise = PromiseResolve(value) |
223 | | - .then((val) => { |
224 | | - if (val != null) { |
225 | | - throw new ERR_INVALID_RETURN_VALUE('nully', name, val); |
226 | | - } |
227 | | - }) |
228 | | - .catch((err) => { |
229 | | - destroyer(ret, err); |
230 | | - }); |
231 | | - |
232 | | - ret = new ComposeDuplex({ |
233 | | - objectMode: true, |
234 | | - highWaterMark: 1, |
235 | | - readable: false, |
236 | | - write, |
237 | | - final(cb) { |
238 | | - final(() => promise.then(cb, cb)); |
239 | | - } |
240 | | - }); |
241 | | - } else { |
242 | | - throw new ERR_INVALID_RETURN_VALUE( |
243 | | - 'Iterable, AsyncIterable or AsyncFunction', name, value); |
244 | | - } |
245 | | - } else if (isNodeStream(stream)) { |
246 | | - ret = stream; |
247 | | - } else if (isIterable(stream)) { |
248 | | - ret = from(ComposeDuplex, stream, { |
249 | | - objectMode: true, |
250 | | - highWaterMark: 1, |
251 | | - writable: false |
252 | | - }); |
253 | | - } else { |
254 | | - throw new ERR_INVALID_ARG_TYPE( |
255 | | - name, |
256 | | - ['Stream', 'Iterable', 'AsyncIterable', 'Function'], |
257 | | - stream) |
258 | | - ; |
259 | | - } |
260 | | - return ret; |
261 | | -} |
262 | | - |
263 | | -function fromAsyncGen(fn) { |
264 | | - let { promise, resolve } = createDeferredPromise(); |
265 | | - const value = fn(async function*() { |
266 | | - while (true) { |
267 | | - const { chunk, done, cb } = await promise; |
268 | | - process.nextTick(cb); |
269 | | - if (done) return; |
270 | | - yield chunk; |
271 | | - ({ promise, resolve } = createDeferredPromise()); |
272 | | - } |
273 | | - }()); |
274 | | - |
275 | | - return { |
276 | | - value, |
277 | | - write(chunk, encoding, cb) { |
278 | | - resolve({ chunk, done: false, cb }); |
279 | | - }, |
280 | | - final(cb) { |
281 | | - resolve({ done: true, cb }); |
282 | | - } |
283 | | - }; |
284 | | -} |
0 commit comments