These wrappers implement a pull style API.
For readable streams, instead of having the stream push the data to its consumer by emitting data
and end
events,
the wrapper lets the consumer pull the data from the stream by calling asynchronous read
methods.
The wrapper takes care of the low level pause
/resume
logic.
Similarly, for writable streams, the wrapper provides a simple asynchronous write
method and takes
care of the low level drain
logic.
For more information on this design, see this blog post
For a simple example of this API in action, see the google client example
Base wrapper for all objects that emit an end
or close
event.
All stream wrappers derive from this wrapper.
wrapper = new streams.Wrapper(stream)
creates a wrapper.emitter = wrapper.emitter
returns the underlying emitter. The emitter stream can be used to attach additional observers.closed = wrapper.closed
returns true if theclose
event has been received.emitter = wrapper.unwrap()
unwraps and returns the underlying emitter.
The wrapper should not be used after this call.
All readable stream wrappers derive from this wrapper.
stream = new streams.ReadableStream(stream[, options])
creates a readable stream wrapper.stream.setEncoding(enc)
sets the encoding. returnsthis
for chaining.data = stream.read(_[, len])
reads asynchronously from the stream and returns astring
or aBuffer
depending on the encoding.
If alen
argument is passed, theread
call returns whenlen
characters or bytes (depending on encoding) have been read, or when the underlying stream has emitted itsend
event (so it may return less thanlen
bytes or chars). Reads till the end iflen
is negative.
Withoutlen
, the read calls returns the data chunks as they have been emitted by the underlying stream.
Once the end of stream has been reached, theread
call returnsnull
.data = stream.readAll(_)
reads till the end of stream.
Equivalent tostream.read(_, -1)
.stream.unread(chunk)
pushes the chunk back to the stream.
returnsthis
for chaining.len = stream.available()
returns the number of bytes/chars that have been received and not read yet.reader = stream.reader
returns a clean ez reader.
All writable stream wrappers derive from this wrapper.
stream = new streams.WritableStream(stream[, options])
creates a writable stream wrapper.stream.write(_, data[, enc])
Writes the data.
This operation is asynchronous because it drains the stream if necessary.
Returnsthis
for chaining.stream.end()
signals the end of the send operation.
Returnsthis
for chaining.writer = stream.writer
returns a clean ez writer.
This is a wrapper around node's http.ServerRequest
:
This stream is readable (see ReadableStream
above).
request = new streams.HttpServerRequest(req[, options])
returns a wrapper aroundreq
, anhttp.ServerRequest
object.
Theoptions
parameter can be used to passlowMark
andhighMark
values, or to control encoding detection (see section below).method = request.method
url = request.url
headers = request.headers
trailers = request.trailers
httpVersion = request.httpVersion
connection = request.connection
socket = request.socket
(same ashttp.ServerRequest
)
This is a wrapper around node's http.ServerResponse
.
This stream is writable (see WritableStream
above).
response = new streams.HttpServerResponse(resp[, options])
returns a wrapper aroundresp
, anhttp.ServerResponse
object.response.writeContinue()
response.writeHead(head)
response.setHeader(name, value)
value = response.getHeader(head)
response.removeHeader(name)
response.addTrailers(trailers)
response.statusCode = value
(same ashttp.ServerResponse
)
This is a wrapper around node's http.Server
object:
server = streams.createHttpServer(requestListener[, options])
creates the wrapper.
requestListener
is called asrequestListener(request, response, _)
whererequest
andresponse
are wrappers aroundhttp.ServerRequest
andhttp.ServerResponse
.
A fresh empty global context is set before every call torequestListener
. See Global context API.server.listen(_, port[, host])
server.listen(_, path)
(same ashttp.Server
)
This is a wrapper around node's http.ClientResponse
This stream is readable (see ReadableStream
above).
response = new HttpClientResponse(resp, options)
wraps a node response object.
options.detectEncoding
and be used to control encoding detection (see section below).response = request.response(_)
returns the response stream.status = response.statusCode
returns the HTTP status code.version = response.httpVersion
returns the HTTP version.headers = response.headers
returns the HTTP response headers.trailers = response.trailers
returns the HTTP response trailers.response.checkStatus(statuses)
throws an error if the status is not in thestatuses
array.
If only one status is expected, it may be passed directly as an integer rather than as an array.
Returnsthis
for chaining.
This is a wrapper around node's http.ClientRequest
.
This stream is writable (see WritableStream
above).
request = streams.httpRequest(options)
creates the wrapper.
The options are the following:method
: the HTTP method,'GET'
by default.headers
: the HTTP headers.url
: the requested URL (with query string if necessary).proxy.url
: the proxy URL.lowMark
andhighMark
: low and high water mark values for buffering (in bytes or characters depending on encoding).
Note that these values are only hints as the data is received in chunks.
response = request.response(_)
returns the response.request.abort()
aborts the request.
This is a wrapper around streams returned by TCP and socket clients:
These streams are both readable and writable (see ReadableStream
and WritableStream
above).
stream = new streams.NetStream(stream[, options])
creates a network stream wrapper.
These are wrappers around node's net.createConnection
:
client = streams.tcpClient(port, host[, options])
returns a TCP connection client.client = streams.socketClient(path[, options])
returns a socket client.
Theoptions
parameter of the constructor provide options for the stream (lowMark
andhighMark
). If you want different options forread
andwrite
operations, you can specify them by creatingoptions.read
andoptions.write
sub-objects insideoptions
.stream = client.connect(_)
connects the client and returns a network stream.
This is a wrapper around node's net.Server
object:
server = streams.createNetServer([serverOptions,] connectionListener [, streamOptions])
creates the wrapper.
connectionListener
is called asconnectionListener(stream, _)
wherestream
is aNetStream
wrapper around the native connection.
A fresh empty global context is set before every call toconnectionListener
. See Global context API.server.listen(_, port[, host])
server.listen(_, path)
(same asnet.Server
)
result = streams.using(_, constructor, stream[, options], fn)
wrapsstream
with an instance ofconstructor
; passes the wrapper tofn(_, wrapper)
and closes the stream afterfn
returns.
fn
is called inside atry/finally
block to guarantee that the stream is closed in all cases.
Returns the value returned byfn
.result = streams.usingReadable(_, stream[, options], fn)
shortcut forstreams.using(_, streams.ReadableStream, stream, options, fn)
result = streams.usingWritable(_, stream[, options], fn)
shortcut forstreams.using(_, streams.WritableStream, stream, options, fn)
streams.pump(_, inStream, outStream)
Pumps frominStream
tooutStream
.
Does not close the streams at the end.
The options.detectEncoding
option controls how the encoding is sent by the
HttpServerRequest
and HttpClientResponse
constructors.
This option can take the following values:
strict
: the RFC-2616-7.2.1 rules are applied.default
: the default algorithm used by streamline v0.4 is used. This algorithm is more lenient and sets the encoding toutf8
when text content is detected, even if there is no charset indication.disable
: null is always returned and the stream is always handled in binary mode (buffers rather than strings).- a function. This is a hook for custom encoding detection.
The function is called as
fn(headers)
and returns the encoding.