Stream processing construction kit
Status: Alpha. Most major functionality is in place but details are subject to change
Gustav makes realtime data processing simple and modular. Each individual unit of processing (let's call them nodes) can be strung together into a workflow. That probably sounds really complicated. Let's look at some code:
'use strict';
import {gustav} from 'gustav';
import {twitSearch, twitSend} from 'gustav-twitter';
import {sentiment, findAngry} from 'gustav-sentiment';
// Find angry people on Twitter and cheer them up with a walrus
let walrusTweeter = gustav.createWorkflow()
.source(twitSearch, {query: 'bad'})
.transf(sentiment, {prop: 'text'})
.sink(twitSend, {
message: (tweet) =>
`@${tweet.user.screen_name} Have a happy walrus:`
Phew. 16 lines of code, not too many. Let's talk about what's happening here:
With Gustav, individual nodes are strung together into a workflow (if you're a computer science-y person, a workflow is a Directed, Acyclic Graph).
There are three types of nodes in Gustav:
Source nodes create a stream from some external source to be passed on to further Gustav nodes for processing. Almost always, a source node will be calling new Observable
. In the walrus example, we used twitSearch
, so let's look at that:
export let twitSearch = config => {
return new Observable(o => {'statuses/filter', {
track: config.track,
language: 'en'
}, stream => {
console.log('stream', stream);
stream.on('data', function(tweet) {
stream.on('error', function(error) {
is passed in as the second argument to .source
.source(twitSearch, {query: 'bad'})
You don't need to know too much about the Twitter API to recognize that this creates an Observable, gets a stream, and then emits whenever that stream has data/error events. This'll be even simpler when Observable.fromCallback gets merged.
Alright, now we've got an observable sequence of data, what shall we do with it?
Transformer nodes take a stream of data and manipulate it in some way. For example, here's the two transform nodes we used above:
export let sentiment = (config, iO) => {
return => {
let sentiValue = senti(item[config.prop]);
item.sentiment = sentiValue;
return item;
export let findAngry = iO => iO.filter(item => item.sentiment.comparative < 0);
The first (sentiment
) maps over all values, calculates the sentiment of the tweet adding that new data to the value before passing it on. The second (findAngry
) filters the stream, only allowing tweets with a negative comparative value through.
We've got our data, we've shaped it into what we want, now what?
A sink node takes our data and outputs it somewhere. It can be as simple as the console:
export let consoleNode = iO => iO.subscribe(console.log);
or more complicated, like sending tweets:
export let twitSend = (config, iO) => {
return iO.subscribe(item => {'statuses/update', {status: config.message(item)}, function(error, tweet, response){
if(error) throw error;
console.log(tweet); // Tweet body.
console.log(response); // Raw response object.
Anytime you're calling .subscribe
, it should be in a sink node.
Sweet! We have the ability to grab data, change it in some way, and send that data elsewhere. How do we string everything together?
A Gustav flow is a directed, acyclic graph composed of the node types covered above. Gustav supports multiple workflows, so your app may look something like:
// Workflow 0
s1 --- t2 --- o2
// Workflow 1
s2 -- \
t1 --- t2 --- o1
s3 -- /
We create a workflow with the intuitive .createWorkflow()
method, which can be passed an optional name:
let myWorkflow = gustav.createWorkflow('myWorkflow');
This workflow is empty and lonely - let's add some nodes to it. You always need to start with a source
let mySource = gustav.createWorkflow()
.source('stocks', {symbol: 'KITTN'});
Further nodes can be chained onto it, using the following methods:
Attach a transformer.sink
Attach a sink (terminal, returns the workflow itself)merge(...nodes)
Merge several nodes into a single stream.tap
Attach a sink but return the previous node for chaining