Skip to content

Latest commit

 

History

History
71 lines (54 loc) · 2.51 KB

Tutorial.md

File metadata and controls

71 lines (54 loc) · 2.51 KB

How to write a query

Queries are defined in PipeFabric as dataflow programs in C++, but we provide a DSL simplyfing the formulation. As an example we want to implement a simple dataflow receiving data via REST, calculating a moving average over the last 10 values, and sending the results to tne console. Let's look at the C++ program.

First, we need to include the PipeFabric header file and should use the namespace.

#include "pfabric.hpp"

using namespace pfabric;

Next, we define the schema: the tuple types for representing input and output data:

// the structure of tuples we receive via REST
typedef TuplePtr<int, double> InTuplePtr;

// the structure of our output (aggregate) tuples
typedef TuplePtr<double> ResultTuplePtr;

And for the aggregation we have to define a type the captures the aggregation state.

// the aggregate operator needs a state object that is defined here:
// template parameters are:
//     * the input type,
//     * the aggregate function (in our case Avg on double values),
//     * the column of the input tuple on which we calculate the aggregate (in
//       our case column #1)
typedef Aggregator1<InTuplePtr, AggrAvg<double, double>, 1> MyAggrState;

In the main function we first create a PFabricContext object which is needed to create a new topology. A topology represents a dataflow (i.e. a query) and is used to add operators. In our example, we start with a REST source for producing a stream of tuples. In the next step, we extract the keyand data fields and produce instances of InTuplePtr. Then, we add a sliding window and an aggregate operator with the state class defined before. Finally, we add the print operator for sending results to std::cout and start the topology. The default mode of starting is asynchronously, therefore we invoke wait() to wait until the execution is finished (which will never happen in this example).

int main(int argc, char **argv) {
  PFabricContext ctx;

  auto t = ctx.createTopology();

  auto s = t->newStreamFromREST(8099, "^/publish$", RESTSource::POST_METHOD)
    .extractJson<InTuplePtr>({"key", "data"})
    .slidingWindow(WindowParams::RowWindow, 10)
    .aggregate<MyAggrState> ()
    .print(std::cout);

  t->start();
  t->wait();
}

After starting the program with ./RestDemo we can send some data via curl:

curl -H "Content-Type: application/json" \
     -X POST -d '{"key": "10", "data": "1.0"}' \
     http://localhost:8099/publish

and will receive the output from our stream query.