@@ -10,6 +10,47 @@ Node.js中 stream 模块用于构建实现了 流 接口的对象。
1010
1111const stream = require('stream');
1212
13+ ## Stream 流
14+
15+ 流是用来比喻数据传输的一种形式,数据传输的起点就是流的源头,数据传输的终点就是流的终点。例如在网页发起一个 HTTP 请求,浏览器就是流的源头,服务器就是流的终点。等服务器处理完请求,返回响应时,服务器就变成了流的源头,浏览器变成了流的终点。
16+
17+ 数据从一端连续不断地传输到另一端,就像水一样从一端流到另一端,所以用流来比喻数据的传输形式。只不过计算机中的流传输的是数据(字节),而不是水。
18+
19+ 在 Node.js 中,stream 模块提供了用于实现流接口的 API。但是很多内置模块都提供了关于流的 API,所以通常不需要显式的调用 stream 模块来使用流。
20+
21+ pipe() 将可写流绑定到可读流,使其自动切换到流动模式并将其所有数据推送到绑定的可写流。 数据流将被自动管理,以便目标可写流不会被更快的可读流漫过。也就是说,pipe() 将数据缓冲限制在可接受的水平,以便不同速度的来源和目标不会压倒可用内存。
22+
23+
24+ ## 流的类型
25+
26+ Node.js 中有四种基本的流类型:
27+
28+ Readable: 可读流,可以从中读取数据的流(例如,fs.createReadStream())。
29+
30+ Writable: 可写流,可以写入数据的流(例如,fs.createWriteStream())。
31+
32+ Duplex: 双工流,Readable 和 Writable 的流(例如,net.Socket)。
33+
34+ Transform: 可以在写入和读取数据时修改或转换数据的 Duplex 流(例如,zlib.createDeflate())。
35+
36+ ## 缓冲
37+
38+ Writable 和 Readable 流都将数据存储在内部缓冲区中。
39+
40+ 可能缓冲的数据量取决于传给流的构造函数的 highWaterMark 选项。 对于普通的流,highWaterMark 选项指定字节的总数。
41+
42+ 当实现调用 stream.push(chunk) 时,数据缓存在 Readable 流中。 如果流的消费者没有调用 stream.read(),则数据会一直驻留在内部队列中,直到被消费。
43+
44+ 一旦内部读取缓冲区的总大小达到 highWaterMark 指定的阈值,则流将暂时停止从底层资源读取数据,直到可以消费当前缓冲的数据(也就是,流将停止调用内部的用于填充读取缓冲区 readable._ read() 方法)。
45+
46+ 当重复调用 writable.write(chunk) 方法时,数据会缓存在 Writable 流中。 虽然内部的写入缓冲区的总大小低于 highWaterMark 设置的阈值,但对 writable.write() 的调用将返回 true。 一旦内部缓冲区的大小达到或超过 highWaterMark,则将返回 false。
47+
48+ stream API 的一个关键目标,尤其是 stream.pipe() 方法,是将数据缓冲限制在可接受的水平,以便不同速度的来源和目标不会压倒可用内存。
49+
50+ highWaterMark 选项是阈值,而不是限制:它规定了流在停止请求更多数据之前缓冲的数据量。 它通常不强制执行严格的内存限制。 特定的流实现可能会选择实施更严格的限制,但这样做是可选的。
51+
52+ 由于 Duplex 和 Transform 流都是 Readable 和 Writable,因此每个流都维护两个独立的内部缓冲区,用于读取和写入,允许每一端独立操作,同时保持适当且高效的数据流。 例如,net.Socket 实例是 Duplex 流,其 Readable 端允许消费从套接字接收的数据,其 Writable 端允许将数据写入套接字。 因为数据可能以比接收数据更快或更慢的速度写入套接字,所以每一端都应该独立于另一端进行操作(和缓冲)。
53+
1354## 流的类型
1455
15564种基本的流类型
@@ -348,12 +389,179 @@ print(fs.createReadStream('file')).catch(console.log);
348389
349390调用 stream.pipe()
350391
392+ 可读流可以通过以下方式 切换回 暂停模式
393+
394+ - 如果没有管道目标,则调用 stream.pause()
395+ - 如果有管道目标,则 移除所有管道目标。 调用 stream.unpipe() 可以移除多个管道目标。
396+
397+
398+ 只有提供了消费或忽略数据的机制后,可读流才会产生数据。如果消费的机制被禁用或移除,则可读流会停止产生数据。
399+
400+ 为了向后兼容,移除data事件句柄不会自动地暂停流。如果有管道目标,一旦目标变为 drain 状态并请求接收数据时,则调用 stream.pause() 也不能保证流会保持暂停模式。
401+
402+ 如果可读流切换到流动模式,且没有可用的“消费者”来处理数据,则数据将会丢失。例如,当调用 readable.resume() 时,没有监听 data 事件 或 data 事件句柄已移除。
403+
404+ 添加 readable 事件句柄会使流自动停止流动,并通过 readable.read() 消费数据。 如果 readable 事件句柄被移除,且存在 data 事件句柄,则流会再次开始流动。
405+
406+ ## 可写流
407+
408+ 可写流是对数据要被写入的目的地的一种抽象。所有可写流都实现了 stream.Writable 类定义的接口。 可写流常见的例子包括客户端的 HTTP 请求、服务器的HTTP响应、fs的写入流、zlib流、 crypto 流,TCP socket、子进程 stdin、process.stdout、 process.stderr。
409+
410+ ``` js
411+ const myStream = getWritableStreamSomehow ();
412+ myStream .write (' 一些数据' );
413+ myStream .write (' 更多数据' );
414+ myStream .end (' 完成写入数据' );
415+ ```
416+
417+ ### stream.Writable类事件
418+
419+ 1 . close 事件
420+
421+ 当流及其任何底层资源已关闭时,将发出close事件。该事件表明不会发出更多事件,也不会进一步计算。
422+
423+ 如果使用 emitClose 选项创建可写流,它将始终发出 close 事件。
424+
425+ 2 . drain 事件
426+
427+ 如果对 stream.write(chunk) 的调用返回 false,则在适合继续将 数据写入流时 将发出 drain 事件。
428+
429+ 3 . error 事件
430+
431+ 如果在写入管道数据时发生错误,则会发出 error 事件。调用时,监听器回调回传递一个 Error 参数。
432+
433+ 发出 error 事件时,流不会关闭。
434+
435+ 4 . finish 事件
436+
437+ 调用 stream.end() 方法后会发出 finish 事件,并且所有数据都已 刷新到底层系统。
438+
439+ ``` js
440+ const fs = require (' fs' );
441+
442+ const writable = fs .createWriteStream (' write-data.txt' );
443+
444+ for (let i = 0 ; i< 10 ; i++ ) {
445+ writable .write (` 写入#${ i} !\n ` );
446+ }
447+
448+ writable .end (' 写入结尾\n ' );
449+ writable .on (' finish' , () => {
450+ console .log (' 写入已完成' );
451+ })
452+ ```
453+
454+ 5 . pipe 事件
455+
456+ 在可读流上调用 stream.pipe() 方法时会发出 pipe 事件,并将此可写流添加到其目标集。
457+
458+ 6 . unpipe 事件
459+
460+ 当在可读流上调用 stream.unpipe() 时触发。
461+
462+ 当可读流通过管道流向可写流发生错误时,也会触发 unpipe 事件。
463+
464+
465+ ### stream.Writable 类方法
466+
467+ 1 . cork
468+
469+ writable.cork() 方法用于强制把所有写入的数据都缓冲到内存中。当调用 stream.uncork() 或 stream.end() 时,缓冲的数据才会被输出。
470+
471+ 当写入大量小块数据到流时,内部缓冲可能失败,从而导致性能下降,writable.cork() 主要用于避免这种情况。对于这种情况,实现了 writable._ writev() 的流可以用更优的方式对写入的数据进行缓冲。
472+
473+ 2 . destroy
474+
475+ writable.destroy([ error] ) 方法用于销毁流。在调用该方法之后,可写流已经结束,随后对 write() 或 end() 的调用都将导致 ERR_STREAM_DESTROYED 错误。
476+
477+ 如果数据在关闭之前应该刷新,则应使用 end() 方法而不是 destroy() 方法,或者在销毁流之前等待 drain 事件。实现者不应该重写此方法,而是 实现 writable._ destroy()
478+
479+ 3 . end
480+
481+ 调用 writable.end([ chunk] [ , encoding ] [ , callback] ) 方法表示不再将数据 写入 Writable。 该方法的参数如下。
482+
483+ - chunk <string > | <Buffer > | <Unit8Array > | <any >: 要写入的可选数据。对于不在对象模式下运行的流,块必须是字符串、Buffer或Uint8Array。对于对象模式流,块可以是除null之外的任务JavaScript值。
484+ - encoding <string > 如果设置了编码,则chunk是一个字符串。
485+ - callback <Function >: 流完成时的可选回调。
486+
487+ 调用writable.end()方法表示不再将数据写入 Writable 。可选的块和编码参数允许在关闭流之前立即写入最后一个额外的数据块。如果提供,则附加可选回调函数作为 finish 事件的监听器。示例如下:
488+
489+ ``` js
490+ const fs = require (' fs' );
491+
492+ const writable = fs .createWriteStream (' write-data.txt' );
493+
494+ for (let i = 0 ; i< 10 ; i++ ) {
495+ writable .write (` 写入#${ i} !\n ` );
496+ }
351497
498+ writable .end (' 写入结尾\n ' );
499+ writable .on (' finish' , () => {
500+ console .log (' 写入已完成' );
501+ })
502+ ```
503+
504+ 调用 stream.end() 后🐒 调用 stream.write() 方法将引发错误.
505+
506+ 4 . setDefaultEncoding
507+
508+ writable.setDefaultEncoding(encoding) 为可写流设置默认的编码。
509+
510+ 5 . uncork
511+
512+ writable.uncork() 方法用于将调用 stream.cork() 后缓冲的所有数据输出到目标。
513+
514+ 当使用 writable.cork() 和 writable.uncork() 来管理流的写入缓冲时,建议使用 process.nextTick() 来延迟调用 writable.uncork()。
515+
516+ 通过这种方式,可以对单个 Node.js 事件循环中调用的所有 writable.write() 进行批处理。
517+
518+ ``` js
519+ stream .cork ();
520+ stream .write (' 一些' );
521+ stream .write (' 数据' );
522+ process .nextTick (() => stream .uncork ());
523+ ```
524+
525+ 如果一个流上多次调用 writable.cork(), 则必须调用同样次数的 writable.uncork() 才能输出缓冲的数据。
352526
353527
528+ ## write
354529
530+ writable.write(chunk, [ , encoding] [ , callback ] ) 写入数据到流,并在数据被完全处理之后调用 callback.
355531
532+ 如果发生错误,则 callback 可能被调用也可能不被调用。为了可靠地检测错误,可以为 error 事件添加监听器。该方法的参数如下:
356533
534+ - chunk <string > | <BUffer > | <Unit8Array > | <any > : 要 写入的数据。对于非对象模式的流, chunk必须时字符串 、 Buffer 或 Unit8Array 。
357535
536+ 对于对象模式的流, chunk可以除 null 外的是 任何 JavaScript 值。
358537
538+ - encoding <string >: 如果 chunk 是字符串,则指定 字符编码。
359539
540+ - callback <Function >: 当数据块 被输出到目标后的回调函数。
541+
542+ writable.write() 写入数据到流,并在数据被完全处理之后调用 callback。如果发生错误,则 callback 可能被调用也可能不被调用。为了可靠地检测错误,可以为 error事件添加监听器。
543+
544+ 在接收了 chunk 后,如果内部的 缓冲小于创建流时配置的 highWaterMark,则返回 true。如果返回 false,则应该停止向流写入数据,直到 drain 事件被触发。
545+
546+ 当流还未被排空时,调用 write() 会缓冲 chunk ,并返回 false。一旦所有当前缓冲的数据块都被排空了(被操作系统接收并传输),则触发 drain 事件。建议一旦 write()返回false,则不再写入任何数据块,直到 达到最大内存占用,这时它会无条件中止,甚至在它中止之前,高内存占用将会导致垃圾回收器的性能变差 和 RSS 变高(即使内存不再需要,通常也不会被释放回系统)。
547+
548+ 如果远程的另一端没有读取数据,TCP 的 socket 可能永远也不会排空,所以写入到一个不会排空的 socket 可能会导致产生远程可利用的漏洞。
549+
550+ 对于 Transform,写入数据到一个不会排空的流尤其成问题,因为 Transform 流默认会被暂停,直到它们被 pipe 或者 添加了data 或 readable 事件句柄。
551+
552+ 如果要被写入的数据可以根据需要生成或取得,建议将逻辑封装为一个可读流并且使用 stream pipe()。如果要优先调用 write(),则可以使用 drain 事件来防止背压于避免内存问题。以下是示例。
553+
554+ ``` js
555+ function write (data , cb ) {
556+ if (! stream .write (data)) {
557+ stream .once (' drain' , cb);
558+ } else {
559+ process .nextTick (cb);
560+ }
561+ }
562+
563+ // 在回调函数被执行后再进行其他的写入
564+ write (' hello' , () => {
565+ console .log (' 完成写入' )
566+ });
567+ ```
0 commit comments