@@ -59,6 +59,227 @@ functions for streams that return `Promise` objects rather than using
59
59
callbacks. The API is accessible via ` require('node:stream/promises') `
60
60
or ` require('node:stream').promises ` .
61
61
62
+ ### ` stream.pipeline(source[, ...transforms], destination[, options]) `
63
+
64
+ ### ` stream.pipeline(streams[, options]) `
65
+
66
+ <!-- YAML
67
+ added: v15.0.0
68
+ -->
69
+
70
+ * ` streams ` {Stream\[ ] |Iterable\[ ] |AsyncIterable\[ ] |Function\[ ] }
71
+ * ` source ` {Stream|Iterable|AsyncIterable|Function}
72
+ * Returns: {Promise|AsyncIterable}
73
+ * ` ...transforms ` {Stream|Function}
74
+ * ` source ` {AsyncIterable}
75
+ * Returns: {Promise|AsyncIterable}
76
+ * ` destination ` {Stream|Function}
77
+ * ` source ` {AsyncIterable}
78
+ * Returns: {Promise|AsyncIterable}
79
+ * ` options ` {Object}
80
+ * ` signal ` {AbortSignal}
81
+ * ` end ` {boolean}
82
+ * Returns: {Promise} Fulfills when the pipeline is complete.
83
+
84
+ ``` cjs
85
+ const { pipeline } = require (' node:stream/promises' );
86
+ const fs = require (' node:fs' );
87
+ const zlib = require (' node:zlib' );
88
+
89
+ async function run () {
90
+ await pipeline (
91
+ fs .createReadStream (' archive.tar' ),
92
+ zlib .createGzip (),
93
+ fs .createWriteStream (' archive.tar.gz' ),
94
+ );
95
+ console .log (' Pipeline succeeded.' );
96
+ }
97
+
98
+ run ().catch (console .error );
99
+ ```
100
+
101
+ ``` mjs
102
+ import { pipeline } from ' node:stream/promises' ;
103
+ import { createReadStream , createWriteStream } from ' node:fs' ;
104
+ import { createGzip } from ' node:zlib' ;
105
+
106
+ await pipeline (
107
+ createReadStream (' archive.tar' ),
108
+ createGzip (),
109
+ createWriteStream (' archive.tar.gz' ),
110
+ );
111
+ console .log (' Pipeline succeeded.' );
112
+ ```
113
+
114
+ To use an ` AbortSignal ` , pass it inside an options object, as the last argument.
115
+ When the signal is aborted, ` destroy ` will be called on the underlying pipeline,
116
+ with an ` AbortError ` .
117
+
118
+ ``` cjs
119
+ const { pipeline } = require (' node:stream/promises' );
120
+ const fs = require (' node:fs' );
121
+ const zlib = require (' node:zlib' );
122
+
123
+ async function run () {
124
+ const ac = new AbortController ();
125
+ const signal = ac .signal ;
126
+
127
+ setImmediate (() => ac .abort ());
128
+ await pipeline (
129
+ fs .createReadStream (' archive.tar' ),
130
+ zlib .createGzip (),
131
+ fs .createWriteStream (' archive.tar.gz' ),
132
+ { signal },
133
+ );
134
+ }
135
+
136
+ run ().catch (console .error ); // AbortError
137
+ ```
138
+
139
+ ``` mjs
140
+ import { pipeline } from ' node:stream/promises' ;
141
+ import { createReadStream , createWriteStream } from ' node:fs' ;
142
+ import { createGzip } from ' node:zlib' ;
143
+
144
+ const ac = new AbortController ();
145
+ const { signal } = ac;
146
+ setImmediate (() => ac .abort ());
147
+ try {
148
+ await pipeline (
149
+ createReadStream (' archive.tar' ),
150
+ createGzip (),
151
+ createWriteStream (' archive.tar.gz' ),
152
+ { signal },
153
+ );
154
+ } catch (err) {
155
+ console .error (err); // AbortError
156
+ }
157
+ ```
158
+
159
+ The ` pipeline ` API also supports async generators:
160
+
161
+ ``` cjs
162
+ const { pipeline } = require (' node:stream/promises' );
163
+ const fs = require (' node:fs' );
164
+
165
+ async function run () {
166
+ await pipeline (
167
+ fs .createReadStream (' lowercase.txt' ),
168
+ async function * (source , { signal }) {
169
+ source .setEncoding (' utf8' ); // Work with strings rather than `Buffer`s.
170
+ for await (const chunk of source ) {
171
+ yield await processChunk (chunk, { signal });
172
+ }
173
+ },
174
+ fs .createWriteStream (' uppercase.txt' ),
175
+ );
176
+ console .log (' Pipeline succeeded.' );
177
+ }
178
+
179
+ run ().catch (console .error );
180
+ ```
181
+
182
+ ``` mjs
183
+ import { pipeline } from ' node:stream/promises' ;
184
+ import { createReadStream , createWriteStream } from ' node:fs' ;
185
+
186
+ await pipeline (
187
+ createReadStream (' lowercase.txt' ),
188
+ async function * (source , { signal }) {
189
+ source .setEncoding (' utf8' ); // Work with strings rather than `Buffer`s.
190
+ for await (const chunk of source ) {
191
+ yield await processChunk (chunk, { signal });
192
+ }
193
+ },
194
+ createWriteStream (' uppercase.txt' ),
195
+ );
196
+ console .log (' Pipeline succeeded.' );
197
+ ```
198
+
199
+ Remember to handle the ` signal ` argument passed into the async generator.
200
+ Especially in the case where the async generator is the source for the
201
+ pipeline (i.e. first argument) or the pipeline will never complete.
202
+
203
+ ``` cjs
204
+ const { pipeline } = require (' node:stream/promises' );
205
+ const fs = require (' node:fs' );
206
+
207
+ async function run () {
208
+ await pipeline (
209
+ async function * ({ signal }) {
210
+ await someLongRunningfn ({ signal });
211
+ yield ' asd' ;
212
+ },
213
+ fs .createWriteStream (' uppercase.txt' ),
214
+ );
215
+ console .log (' Pipeline succeeded.' );
216
+ }
217
+
218
+ run ().catch (console .error );
219
+ ```
220
+
221
+ ``` mjs
222
+ import { pipeline } from ' node:stream/promises' ;
223
+ import fs from ' node:fs' ;
224
+ await pipeline (
225
+ async function * ({ signal }) {
226
+ await someLongRunningfn ({ signal });
227
+ yield ' asd' ;
228
+ },
229
+ fs .createWriteStream (' uppercase.txt' ),
230
+ );
231
+ console .log (' Pipeline succeeded.' );
232
+ ```
233
+
234
+ The ` pipeline ` API provides [ callback version] [ stream-pipeline ] :
235
+
236
+ ### ` stream.finished(stream[, options]) `
237
+
238
+ <!-- YAML
239
+ added: v15.0.0
240
+ -->
241
+
242
+ * ` stream ` {Stream}
243
+ * ` options ` {Object}
244
+ * ` error ` {boolean|undefined}
245
+ * ` readable ` {boolean|undefined}
246
+ * ` writable ` {boolean|undefined}
247
+ * ` signal ` : {AbortSignal|undefined}
248
+ * Returns: {Promise} Fulfills when the stream is no
249
+ longer readable or writable.
250
+
251
+ ``` cjs
252
+ const { finished } = require (' node:stream/promises' );
253
+ const fs = require (' node:fs' );
254
+
255
+ const rs = fs .createReadStream (' archive.tar' );
256
+
257
+ async function run () {
258
+ await finished (rs);
259
+ console .log (' Stream is done reading.' );
260
+ }
261
+
262
+ run ().catch (console .error );
263
+ rs .resume (); // Drain the stream.
264
+ ```
265
+
266
+ ``` mjs
267
+ import { finished } from ' node:stream/promises' ;
268
+ import { createReadStream } from ' node:fs' ;
269
+
270
+ const rs = createReadStream (' archive.tar' );
271
+
272
+ async function run () {
273
+ await finished (rs);
274
+ console .log (' Stream is done reading.' );
275
+ }
276
+
277
+ run ().catch (console .error );
278
+ rs .resume (); // Drain the stream.
279
+ ```
280
+
281
+ The ` finished ` API provides [ callback version] [ stream-finished ] :
282
+
62
283
### Object mode
63
284
64
285
All streams created by Node.js APIs operate exclusively on strings and ` Buffer `
@@ -2425,22 +2646,7 @@ Especially useful in error handling scenarios where a stream is destroyed
2425
2646
prematurely (like an aborted HTTP request), and will not emit ` 'end' `
2426
2647
or ` 'finish' ` .
2427
2648
2428
- The ` finished ` API provides promise version:
2429
-
2430
- ``` js
2431
- const { finished } = require (' node:stream/promises' );
2432
- const fs = require (' node:fs' );
2433
-
2434
- const rs = fs .createReadStream (' archive.tar' );
2435
-
2436
- async function run () {
2437
- await finished (rs);
2438
- console .log (' Stream is done reading.' );
2439
- }
2440
-
2441
- run ().catch (console .error );
2442
- rs .resume (); // Drain the stream.
2443
- ```
2649
+ The ` finished ` API provides [ promise version] [ stream-finished-promise ] .
2444
2650
2445
2651
` stream.finished() ` leaves dangling event listeners (in particular
2446
2652
` 'error' ` , ` 'end' ` , ` 'finish' ` and ` 'close' ` ) after ` callback ` has been
@@ -2520,97 +2726,7 @@ pipeline(
2520
2726
);
2521
2727
```
2522
2728
2523
- The ` pipeline ` API provides a promise version, which can also
2524
- receive an options argument as the last parameter with a
2525
- ` signal ` {AbortSignal} property. When the signal is aborted,
2526
- ` destroy ` will be called on the underlying pipeline, with an
2527
- ` AbortError ` .
2528
-
2529
- ``` js
2530
- const { pipeline } = require (' node:stream/promises' );
2531
- const fs = require (' node:fs' );
2532
- const zlib = require (' node:zlib' );
2533
-
2534
- async function run () {
2535
- await pipeline (
2536
- fs .createReadStream (' archive.tar' ),
2537
- zlib .createGzip (),
2538
- fs .createWriteStream (' archive.tar.gz' ),
2539
- );
2540
- console .log (' Pipeline succeeded.' );
2541
- }
2542
-
2543
- run ().catch (console .error );
2544
- ```
2545
-
2546
- To use an ` AbortSignal ` , pass it inside an options object,
2547
- as the last argument:
2548
-
2549
- ``` js
2550
- const { pipeline } = require (' node:stream/promises' );
2551
- const fs = require (' node:fs' );
2552
- const zlib = require (' node:zlib' );
2553
-
2554
- async function run () {
2555
- const ac = new AbortController ();
2556
- const signal = ac .signal ;
2557
-
2558
- setTimeout (() => ac .abort (), 1 );
2559
- await pipeline (
2560
- fs .createReadStream (' archive.tar' ),
2561
- zlib .createGzip (),
2562
- fs .createWriteStream (' archive.tar.gz' ),
2563
- { signal },
2564
- );
2565
- }
2566
-
2567
- run ().catch (console .error ); // AbortError
2568
- ```
2569
-
2570
- The ` pipeline ` API also supports async generators:
2571
-
2572
- ``` js
2573
- const { pipeline } = require (' node:stream/promises' );
2574
- const fs = require (' node:fs' );
2575
-
2576
- async function run () {
2577
- await pipeline (
2578
- fs .createReadStream (' lowercase.txt' ),
2579
- async function * (source , { signal }) {
2580
- source .setEncoding (' utf8' ); // Work with strings rather than `Buffer`s.
2581
- for await (const chunk of source ) {
2582
- yield await processChunk (chunk, { signal });
2583
- }
2584
- },
2585
- fs .createWriteStream (' uppercase.txt' ),
2586
- );
2587
- console .log (' Pipeline succeeded.' );
2588
- }
2589
-
2590
- run ().catch (console .error );
2591
- ```
2592
-
2593
- Remember to handle the ` signal ` argument passed into the async generator.
2594
- Especially in the case where the async generator is the source for the
2595
- pipeline (i.e. first argument) or the pipeline will never complete.
2596
-
2597
- ``` js
2598
- const { pipeline } = require (' node:stream/promises' );
2599
- const fs = require (' node:fs' );
2600
-
2601
- async function run () {
2602
- await pipeline (
2603
- async function * ({ signal }) {
2604
- await someLongRunningfn ({ signal });
2605
- yield ' asd' ;
2606
- },
2607
- fs .createWriteStream (' uppercase.txt' ),
2608
- );
2609
- console .log (' Pipeline succeeded.' );
2610
- }
2611
-
2612
- run ().catch (console .error );
2613
- ```
2729
+ The ` pipeline ` API provides a [ promise version] [ stream-pipeline-promise ] .
2614
2730
2615
2731
` stream.pipeline() ` will call ` stream.destroy(err) ` on all streams except:
2616
2732
@@ -4544,7 +4660,11 @@ contain multi-byte characters.
4544
4660
[ stream-_write ] : #writable_writechunk-encoding-callback
4545
4661
[ stream-_writev ] : #writable_writevchunks-callback
4546
4662
[ stream-end ] : #writableendchunk-encoding-callback
4663
+ [ stream-finished ] : #streamfinishedstream-options-callback
4664
+ [ stream-finished-promise ] : #streamfinishedstream-options
4547
4665
[ stream-pause ] : #readablepause
4666
+ [ stream-pipeline ] : #streampipelinesource-transforms-destination-callback
4667
+ [ stream-pipeline-promise ] : #streampipelinesource-transforms-destination-options
4548
4668
[ stream-push ] : #readablepushchunk-encoding
4549
4669
[ stream-read ] : #readablereadsize
4550
4670
[ stream-resume ] : #readableresume
0 commit comments