Commonly used stream patterns for Web Streams API and NodeJS Stream.
@datastream/core
- pipeline
- pipejoin
- streamToArray
- streamToString
- isReadable
- isWritable
- makeOptions
- createReadableStream
- createTransformStream
- createWritableStream
- Readable: The start of a pipeline of streams that injects data into a stream.
- PassThrough: Does not modify the data, but listens to the data and prepares a result that can be retrieved.
- Transform: Modifies data as it passes through.
- Writable: The end of a pipeline of streams that stores data from the stream.
@datastream/string
- stringReadableStream [Readable]
- stringLengthStream [PassThrough]
- stringOutputStream [PassThrough]
@datastream/object
- objectReadableStream [Readable]
- objectCountStream [PassThrough]
- objectBatchStream [Transform]
- objectOutputStream [PassThrough]
@datastream/digest
- digestStream [PassThrough]
@datastream/csv[/{parse,format}]
- csvParseStream [Transform]
- csvFormatStream [Transform]
npm install @datastream/core @datastream/{module}
-
pipeline(stream[], options): Connects streams and awaits until completion. Returns results from stream taps. Will add in a terminating Writable if missing.
-
pipejoin(stream[]): Connects streams and returns resulting stream for use with async iterators
-
streamToArray(stream): Returns array from stream chunks. stream must not end with Writable.
-
streamToString(stream): Returns string from stream chunks. stream must not end with Writable.
-
isReadable(stream): Return bool is stream is Readable
-
isWritable(stream): Return bool is stream is Writable
-
makeOptions(options): Make options interoperable between Readable/Writable and Transform
-
creatReadableStream(input, options): Create a Readable stream from input (string, array, iterable) with options.
-
creatTransformStream((chunk)=>{}, options): Create a Transform stream that allows mutation of chunk before being passed.
-
creatWritableStream((chunk)=>{}, options): Create a Writable stream that allows mutation of chunk before being passed.
-
options:
highWaterMark
chunkSize
signal
import {pipejoin, streamToArray, createReadableStream, createTransformStream} from '@datastream/core'
import {csvParseStream} from '@datastream/csv'
let count
const streams = [
createReadableStream('a,b,c\r\n1,2,3'),
createTransformStream(() => { count += 1}),
createTransformStream(console.log),
]
const river = pipejoin(streams)
const output = await streamToArray(river)
- Documentation
- More stream modules
- charset detection and encoding/decoding
- compression
- JSON Schema
- Possible future stream modules
- JSON Stream
- JSON
- IPFS
- encrypt/decrypt
- XML