Skip to content

Latest commit

 

History

History
178 lines (131 loc) · 4.03 KB

readable.md

File metadata and controls

178 lines (131 loc) · 4.03 KB

Readable

可读流的功能是作为上游,提供数据给下游。

本文中用readable来指代一个Readable实例。

如何创建

可读流通过push方法产生数据,存入readable的缓存中。 当调用push(null)时,便宣告了流的数据产生的结束。

正常情况下,需要为流实例提供一个_read方法,在这个方法中调用push产生数据。 既可以在同一个tick中(同步)调用push,也可以异步的调用(通常如此)。

在需要数据时,流内部会自动调用_read方法来往缓存中添加数据。

var Stream = require('stream')

var readable = Stream.Readable()

var source = ['a', 'b', 'c']

readable._read = function () {
  this.push(source.shift() || null)
}

var Stream = require('stream')

var source = ['a', 'b', 'c']
var readable = Stream.Readable({
  read: function () {
    this.push(source.shift() || null)
  },
})

end事件

数据被消耗完时,会触发end事件。 所谓“消耗完”,需要满足两个条件:

  • 已经调用push(null),声明不会再有任何新的数据产生
  • 缓存中的数据也被读取完
var Stream = require('stream')

var source = ['a', 'b', 'c']
var readable = Stream.Readable({
  read: function () {
    var data = source.shift() || null
    console.log('push', data)
    this.push(data)
  },
})

readable.on('end', function () {
  console.log('end')
})

readable.on('data', function (data) {
  console.log('data', data)
})

输出:

⌘ node example/event-end.js
push a
push b
data <Buffer 61>
push c
data <Buffer 62>
push null
data <Buffer 63>
end

如何使用

下游通过监听data事件(flowing模式)或通过调用read方法(paused模式), 从缓存中获取数据进行消耗。

flowing模式

flowing模式下,readable的数据会持续不断的生产出来, 每个数据都会触发一次data事件,通过监听该事件来获得数据。

以下条件均可以使readable进入flowing模式:

  • 调用resume方法
  • 如果之前未调用pause方法进入paused模式,则监听data事件也会调用resume方法。
  • readable.pipe(writable)pipe中会监听data事件。
var Stream = require('stream')

var source = ['a', 'b', 'c']

var readable = Stream.Readable({
  read: function () {
    this.push(source.shift() || null)
  },
})

readable.on('data', function (data) {
  console.log(data)
})

输出结果:

⌘ node example/flowing-mode.js
<Buffer 61>
<Buffer 62>
<Buffer 63>

通常,并不直接监听data事件去消耗流,而是通过pipe方法去消耗。

paused模式

paused模式下,通过readable.read去获取数据。

以下条件均可使readable进入paused模式:

  • 流创建完成,即初始状态
  • flowing模式下调用pause方法
  • 通过unpipe移除所有下游

除了初始状态外,很少会在paused模式下使用流。 一般不会调用pause方法或是unpipe方法从flowing模式切换至paused模式。

var Stream = require('stream')

var source = ['a', 'b', 'c']

var readable = Stream.Readable({
  read: function () {
    this.push(source.shift() || null)
  },
})

readable.on('readable', function () {
  var data
  while (data = this.read()) {
    console.log(data)
  }
})

输出结果:

⌘ node example/paused-mode.js
<Buffer 61 62>
<Buffer 63>

readable事件表示流中产生了新数据,或是到了流的尽头。 read(n)时,会从缓存中试图读取相应的字节数。 当n未指定时,会一次将缓存中的字节全部读取。

如果在flowing模式下调用read(),不指定n时,会逐个元素地将缓存读完, 而不像paused模式会一次全部读取。

小结

  • 使用push来产生数据。
  • 必须调用push(null)来结束流,否则下游会一直等待。
  • push可以同步调用,也可异步调用。
  • end事件表示可读流中的数据已被完全消耗。
  • 通常在flowing模式下使用可读流。