Skip to content

add sample #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions example/descriptive_pipeline/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM python:3.9.10

ARG VERSION

WORKDIR /timeplus
ADD ./requirements.txt /timeplus
RUN pip3 install -r requirements.txt
ADD ./app /timeplus/app/
ADD ./server /timeplus/server/

EXPOSE 5001

ENTRYPOINT ["uvicorn", "server.main:app", "--host", "0.0.0.0", "--port", "5001", "--http", "h11"]
23 changes: 23 additions & 0 deletions example/descriptive_pipeline/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
VERSION = $(shell git rev-parse --short HEAD)
BIN_NAME = proton-pipeline-service
IMAGE_NAME = $(BIN_NAME):$(VERSION)
DOCKER_ID_USER = timeplus
FULLNAME=$(DOCKER_ID_USER)/${IMAGE_NAME}

.PHONY: service

service:
uvicorn server.main:app --port 5001 --host 0.0.0.0 --http h11 --reload

proton:
docker run -d -p 8463:8463 --pull always --name proton ghcr.io/timeplus-io/proton:latest

docker: Dockerfile
docker build -t $(IMAGE_NAME) .

docker_run:
docker run -p 5001:5001 $(IMAGE_NAME)

push:
docker tag $(IMAGE_NAME) $(FULLNAME)
docker push $(FULLNAME)
99 changes: 99 additions & 0 deletions example/descriptive_pipeline/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Descriptive Pipeline

In this sample, we leverage following open source components to build a descriptive streaming SQL based pipeline, and exposed the query result as WebSocket or http stream API endpoint.

- FastAPI
- Proton


## quick start

to run this sample, you need `python`

1. run `pip install -r requirements.txt` to install all the dependencies
2. run `docker run -d -p 8463:8463 --pull always --name proton ghcr.io/timeplus-io/proton:latest` to start a proton docker instance, with port `8643` exposed, the python driver will use this port
3. configure your streaming pipeline in [config.yaml](config.yaml)
4. run `uvicorn server.main:app --port 5001 --host 0.0.0.0 --http h11 --reload` to start the server which will be hosting the sql pipeline using FastAPI and expose WebSocket and HTTP stream api endpoint
5. run `wscat -c ws://localhost:5001/queries/<pipeline_name>` to consume the WebSocket streaming result from the pipeline
6. run `curl http://localhost:5001/queries/<pipeline_name>` to consume the HTTP stream result from the pipeline

## pipeline description

you can configure your pipeline in format of yaml, here is the sample pipeline for your reference.

```yaml
pipelines:
- name: pipeline1
sqls:
- |
CREATE RANDOM STREAM IF NOT EXISTS devices(
device string default 'device'||to_string(rand()%4),
temperature float default rand()%1000/10
)
- |
SELECT * FROM devices
- name: pipeline2
sqls:
- |
CREATE RANDOM STREAM IF NOT EXISTS devices(
device string default 'device'||to_string(rand()%4),
temperature float default rand()%1000/10
)
- |
SELECT
window_start,
count(*) as count
FROM
tumble(devices, 1s)
GROUP BY
window_start
- name: pipeline3
sqls:
- |
SELECT 1
```

1. you can define multiple pipelines
2. each pipeline has a unqiue name, this name will be used in the url of WebScoket or HTTP stream endpoint to identify which pipeline to consume
3. pipeline execution is triggerred by API call
4. each pipeline contains a list of SQL queries to run, you can call DDL create streams, external streams, views, materialized views and query in your pipeline, the last query and only the last query should be the query that return streaming or historical query result.

in the above case, we defined 3 pipelines

- pipeline1 : create a random stream -> run a streaming query to tail all data on that stream
- pipeline2 : create a random stream -> run a tumble window to caculate the count of event in each window
- pipeline3 : run a historical query `select 1` which is usually used to quick test if the SQL is working or not


## streaming result

In this sample, all query results are returned in lines of json object. for example:


Websocket:

```shell
wscat -c ws://localhost:5001/queries/pipeline1
Connected (press CTRL+C to quit)
< {'device': 'device1', 'temperature': 16.899999618530273, '_tp_time': '2024-01-23 02:50:37.798000+00:00'}
< {'device': 'device2', 'temperature': 55.0, '_tp_time': '2024-01-23 02:50:37.798000+00:00'}
< {'device': 'device2', 'temperature': 33.0, '_tp_time': '2024-01-23 02:50:37.798000+00:00'}
< {'device': 'device3', 'temperature': 59.900001525878906, '_tp_time': '2024-01-23 02:50:37.798000+00:00'}
< {'device': 'device0', 'temperature': 92.0, '_tp_time': '2024-01-23 02:50:37.798000+00:00'}
< {'device': 'device1', 'temperature': 11.699999809265137, '_tp_time': '2024-01-23 02:50:37.803000+00:00'}
< {'device': 'device2', 'temperature': 23.399999618530273, '_tp_time': '2024-01-23 02:50:37.803000+00:00'}
< {'device': 'device3', 'temperature': 37.900001525878906, '_tp_time': '2024-01-23 02:50:37.803000+00:00'}
< {'device': 'device1', 'temperature': 77.69999694824219, '_tp_time': '2024-01-23 02:50:37.803000+00:00'}
< {'device': 'device3', 'temperature': 13.899999618530273, '_tp_time': '2024-01-23 02:50:37.803000+00:00'}
< {'device': 'device2', 'temperature': 84.19999694824219, '_tp_time': '2024-01-23 02:50:37.808000+00:00'}

```

HTTP stream:

```shell
curl http://localhost:5001/queries/pipeline2
{"window_start": "2024-01-23 02:52:07+00:00", "count": 580}
{"window_start": "2024-01-23 02:52:08+00:00", "count": 1000}
{"window_start": "2024-01-23 02:52:09+00:00", "count": 1000}
```
35 changes: 35 additions & 0 deletions example/descriptive_pipeline/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# host: localhost
# port: 8463
# db: default
# user: default
# password: ""

pipelines:
- name: pipeline1
sqls:
- |
CREATE RANDOM STREAM IF NOT EXISTS devices(
device string default 'device'||to_string(rand()%4),
temperature float default rand()%1000/10
)
- |
SELECT * FROM devices
- name: pipeline2
sqls:
- |
CREATE RANDOM STREAM IF NOT EXISTS devices(
device string default 'device'||to_string(rand()%4),
temperature float default rand()%1000/10
)
- |
SELECT
window_start,
count(*) as count
FROM
tumble(devices, 1s)
GROUP BY
window_start
- name: pipeline3
sqls:
- |
SELECT 1
5 changes: 5 additions & 0 deletions example/descriptive_pipeline/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fastapi==0.75.0
loguru==0.6.0
uvicorn[standard]==0.17.6
retry==0.9.2
proton-driver==0.2.10
Loading