Commonly used stream patterns for Web Streams API and NodeJS Stream.
@datastream/core
- pipeline
- pipejoin
- streamToArray
- streamToString
- isReadable
- isWritable
- makeOptions
- createReadableStream
- createTransformStream
- createWritableStream
- Readable: The start of a pipeline of streams that injects data into a stream.
- PassThrough: Does not modify the data, but listens to the data and prepares a result that can be retrieved.
- Transform: Modifies data as it passes through.
- Writable: The end of a pipeline of streams that stores data from the stream.
@datastream/string
- stringReadableStream [Readable]
- stringLengthStream [PassThrough]
- stringOutputStream [PassThrough]
@datastream/object
- objectReadableStream [Readable]
- objectCountStream [PassThrough]
- objectBatchStream [Transform]
- objectOutputStream [PassThrough]
npm install @datastream/core @datastream/{module}
-
pipeline(stream[], options): Connects streams and awaits until completion. Returns results from stream taps. Will add in a terminating Writable if missing.
-
pipejoin(stream[]): Connects streams and returns resulting stream for use with async iterators
-
streamToArray(stream): Returns array from stream chunks. stream must not end with Writable.
-
streamToString(stream): Returns string from stream chunks. stream must not end with Writable.
-
isReadable(stream): Return bool is stream is Readable
-
isWritable(stream): Return bool is stream is Writable
-
makeOptions(options): Make options interoperable between Readable/Writable and Transform
-
creatReadableStream(input, options): Create a Readable stream from input (string, array, iterable) with options.
-
creatTransformStream((chunk)=>chunk, options): Create a Transform stream that allows mutation of chunk before being passed.
-
creatWritableStream((chunk)=>{}, options): Create a Writable stream that allows mutation of chunk before being passed.
-
options:
highWaterMark
chunkSize
signal
import {
pipejoin,
streamToArray,
createReadableStream,
createTransformStream,
} from '@datastream/core'
import { csvParseStream } from '@datastream/csv'
let count
const streams = [
createReadableStream('a,b,c\r\n1,2,3'),
createTransformStream(() => {
count += 1
}),
createTransformStream(console.log),
]
const river = pipejoin(streams)
const output = await streamToArray(river)
- Documentation
- More stream modules
- charset detection and encoding/decoding
- compression
- JSON Schema
- Possible future stream modules
- JSON Stream
- JSON
- IPFS
- encrypt/decrypt
- XML