Pipeline Transform is a library for interacting with sequences in TypeScript in a type-safe, pipeline-structured way. It's great for:
- Efficient data pipelines
- Handling infinite or never-ending sequences
- Simplifying performing async operators on collections
Start by creating a PipelineTransform instance via PipelineTransform.from. It accepts arrays or anything iterable (like generators):
import { PipelineTransform } from "PipelineTransform";
const pt = PipelineTransform.from([1, 2, 3, 4, 5]);From here, chain on as many sequence operators as you'd like:
const pt = PipelineTransform.from([1, 2, 3, 4, 5])
.map(x => x * 2)
.filter(x => x >= 4)
.map(x => x.toString().repeat(x))
.take(3);It's important to note here that nothing has actually happened yet since we haven't provided any reason for this pipeline to run. Pipelines are lazily-evaluated for efficiency. Right now, pt is a PipelineTransform<string>. Here's what we can do with that:
PipelineTransform instances are iterable via the async iterable protocol:
for await (const item of pt) {
console.log(item);
}This is useful if you'd like to pass a PipelineTransform into another library or data processing pipeline that also integrates this protocol.
Alternatively, we can fully "materialize" the pipeline into an array with toArray:
const array = await pt.toArray();Other operators, like some, every, includes, find, join, and reduce start evaluating the pipeline to return a result. The result is always a Promise, regardless of whether async operations were used or not. This is so adding async operations later into your pipeline never results in a refactor.
You may be used to dealing with this when processing arrays in similar chains:
[1, 2, 3, 4, 5]
.map(x => x * 2)
.filter(x => x > 2) // Whatever, contrived example
.map(async x => {
const url = `https://api/item/${x}`;
const result = await fetch(url);
const json = await result.json();
return json;
})
// ...?Here we'd need to stop the pipeline, assign this to a variable, await Promise.all, then continue:
const responsePromises = // above code
const responses = await Promise.all(responsePromises);
const tags = responses.flatMap(json => json.tags);This can be clunky depending on what you're trying to accomplish! With PipelineTransform, this is not an issue at all.
await tags = PipelineTransform.from([1, 2, 3, 4, 5])
.map(x => x * 2)
.filter(x => x > 2) // Whatever, contrived example
.map(async x => {
const url = `https://api/item/${x}`;
const result = await fetch(url);
const json = await result.json();
return json;
})
.flatMap(json => json.tags)
.toArray(); // Easy!The above array examples are also pretty sub-optimal, since they involve iterating the array multiple times. This example iterates 3 times:
[1, 2, 3, 4, 5]
.map(x => x * 2) // 2, 4, 6, 8, 10
.filter(x => x > 2) // 4, 6, 8, 10
.map(x => x.toString()); // "4", "6", "8", "10"With pipelines, we "iterate" once - items are yielded one by one from the "top" of the pipe and fall through each stage. Items that make it to the end are candidates for materialization/can be returned.
await PipelineTransform.from([1, 2, 3, 4, 5])
.map(x => x * 2)
.filter(x => x > 2)
.map(x => x.toString())
.toArray();PipelineTransform supports these methods, which should all behave similarly to their array counterparts:
mapflatMapflatfilterconcatforEachincludesfindsomeeveryjoinreduce
A few exceptions are filter and find which attempt to do additional type narrowing and respect type guards. The API also includes a few additional helpful methods:
toArray- fully materialize the stream into an array.batch- groups items in the sequence into arrays of a specified size (which are then sequenced).take- useful for infinite or long sequences. Think of this similar toArray.slice(0, N).inspect-console.logevery item that passes through this stage. Accepts labels to prefix calls toconsole.logwith.
Contributions are absolutely welcome! A bit about this project: