Skip to content

stream-processing: getting-started: hands-on: general cleanup #1629

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 29 additions & 54 deletions stream-processing/getting-started/hands-on.md
Original file line number Diff line number Diff line change
@@ -1,42 +1,43 @@
# Hands On! 101
# Tutorial

This article goes through very specific and simple steps to learn how Stream Processor works. For simplicity it uses a custom Docker image that contains the relevant components for testing.
Follow this tutorial to learn more about stream processing.

## Requirements

The following tutorial requires the following software components:
This tutorial requires the following components:

* [Fluent Bit](https://fluentbit.io) >= v1.2.0
* [Docker Engine](https://www.docker.com/products/docker-engine) \(not mandatory if you already have Fluent Bit binary installed in your system\)
* Fluent Bit
* [Docker Engine](https://www.docker.com/products/docker-engine)
* A stream processing [sample file](https://raw.githubusercontent.com/fluent/fluent-bit-docs/37b477786d6e28eb223e08611c26ec93671a34ac/stream-processing/samples/sp-samples-1k.log)

In addition download the following data [sample file](https://raw.githubusercontent.com/fluent/fluent-bit-docs/37b477786d6e28eb223e08611c26ec93671a34ac/stream-processing/samples/sp-samples-1k.log) \(130KB\).
## Steps

## Stream Processing using the command line

For all next steps we will run Fluent Bit from the command line, and for simplicity we will use the official Docker image.
These steps use the official Fluent Bit Docker image.

### 1. Fluent Bit version

Run the following command to confirm that Fluent Bit is installed and up-to-date:

```bash
$ docker run -ti fluent/fluent-bit:1.4 /fluent-bit/bin/fluent-bit --version
Fluent Bit v1.8.2
```

### 2. Parse sample files

The samples file contains JSON records. On this command, we are appending the Parsers configuration file and instructing _tail_ input plugin to parse the content as _json_:
The sample file contains JSON records. Run the following command to append the `parsers.conf` file and instruct the Tail input plugin to parse content as JSON:

```bash
$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
fluent/fluent-bit:1.8.2 \
/fluent-bit/bin/fluent-bit -R /fluent-bit/etc/parsers.conf \
-i tail -p path=/sp-samples-1k.log \
-p parser=json \
-p read_from_head=true \
-p read_from_head=true \
-o stdout -f 1
```

The command above will simply print the parsed content to the standard output interface. The content will print the _Tag_ associated to each record and an array with two fields: record timestamp and record map:
This command prints the parsed content to the standard output interface. The parsed content includes a tag associated with each record and an array with two fields: a timestamp and a record map:

```text
Fluent Bit v1.8.2
Expand All @@ -58,11 +59,9 @@ Fluent Bit v1.8.2
[5] tail.0: [1557322456.315550927, {"date"=>"22/abr/2019:12:43:52 -0600", "ip"=>"132.113.203.169", "word"=>"fendered", "country"=>"United States", "flag"=>true, "num"=>53}]
```

As of now there is no Stream Processing, on step \#3 we will start doing some basic queries.

### 3. Selecting specific record keys
### 3. Select specific record keys

This command introduces a Stream Processor \(SP\) query through the **-T** option and changes the output plugin to _null_, this is done with the purpose of obtaining the SP results in the standard output interface and avoid confusions in the terminal.
Run the following command to create a stream processor query using the `-T` flag and change the output to the Null plugin. This obtains the stream processing results in the standard output interface and avoids confusion in the terminal.

```bash
$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
Expand All @@ -77,7 +76,7 @@ $ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
-o null -f 1
```

The query above aims to retrieve all records that a key named _country_ value matches the value _Chile_, and for each match compose and output a record using only the key fields _word_ and _num_:
The previous query aims to retrieve all records for which the `country` key contains the value `Chile`. For each match, it composes and outputs a record that only contains the keys `word` and `num`:

```text
[0] [1557322913.263534, {"word"=>"Candide", "num"=>94}]
Expand All @@ -87,9 +86,9 @@ The query above aims to retrieve all records that a key named _country_ value ma
[0] [1557322913.263706, {"word"=>"decasyllables", "num"=>76}]
```

### 4. Calculate Average Value
### 4. Calculate average value

The following query is similar to the one in the previous step, but this time we will use the aggregation function called AVG\(\) to get the average value of the records ingested:
Run the following command to use the `AVG` aggregation function to get the average value of ingested records:

```bash
$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
Expand All @@ -104,7 +103,7 @@ $ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
-o null -f 1
```

output:
The previous query yields the following output:

```text
[0] [1557323573.940149, {"AVG(num)"=>61.230770}]
Expand All @@ -114,11 +113,13 @@ output:
[0] [1557323573.945130, {"AVG(num)"=>99.000000}]
```

why did we get multiple records? Answer: When Fluent Bit processes the data, records come in chunks and the Stream Processor runs the process over chunks of data, so the input plugin ingested 5 chunks of records and SP processed the query for each chunk independently. To process multiple chunks at once we have to group results during windows of time.
{% hint style="info" %}
The resulting output contains multiple records because Fluent Bit processes data in chunks, and the stream processor processes each chunk independently. To process multiple chunks at the same time, you can group results using time windows.
{% endhint %}

### 5. Grouping Results and Window
### 5. Group results and windows

Grouping results aims to simplify data processing and when used in a defined window of time we can achieve great things. The next query group the results by _country_ and calculate the average of _num_ value, the processing window is 1 second which basically means: process all incoming chunks coming within 1 second window:
Grouping results within a time window simplifies data processing. Run the following command to group results by `country` and calculate the average of `num` with a one-second processing window:

```bash
$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
Expand All @@ -136,17 +137,17 @@ $ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
-o null -f 1
```

output:
The previous query yields the following output:

```text
[0] [1557324239.003211, {"country"=>"Chile", "AVG(num)"=>53.164558}]
```

### 6. Ingest Stream Processor results as new Stream of Data
### 6. Ingest stream processor results as a new stream of data

Now we see a more real-world use case. Sending data results to the standard output interface is good for learning purposes, but now we will instruct the Stream Processor to ingest results as part of Fluent Bit data pipeline and attach a Tag to them.
Next, instruct the stream processor to ingest results as part of the Fluent Bit data pipeline and assign a tag to each record.

This can be done using the **CREATE STREAM** statement that will also tag results with **sp-results** value. Note that output plugin parameter is now _stdout_ matching all records tagged with _sp-results_:
Run the following command, which uses a `CREATE STREAM` statement to tag results with the `sp-results` tag, then outputs records with that tag to standard output:

```bash
$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
Expand All @@ -166,34 +167,8 @@ $ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
-o stdout -m 'sp-results' -f 1
```

output:
The previous query yields the following results:

```text
[0] sp-results: [1557325032.000160100, {"country"=>"Chile", "AVG(num)"=>53.164558}]
```

## F.A.Q

### Where STREAM name comes from?

Fluent Bit have the notion of streams, and every input plugin instance gets a default name. You can override that behavior by setting an alias. Check the **alias** parameter and new **stream** name in the following example:

```bash
$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
fluent/fluent-bit:1.8.2 \
/fluent-bit/bin/fluent-bit \
-R /fluent-bit/etc/parsers.conf \
-i tail \
-p path=/sp-samples-1k.log \
-p parser=json \
-p read_from_head=true \
-p alias=samples \
-T "CREATE STREAM results WITH (tag='sp-results') \
AS \
SELECT country, AVG(num) FROM STREAM:samples \
WINDOW TUMBLING (1 SECOND) \
WHERE country='Chile' \
GROUP BY country;" \
-o stdout -m 'sp-results' -f 1
```