|
| 1 | +# Descriptive Pipeline |
| 2 | + |
| 3 | +In this sample, we leverage following open source components to build a descriptive streaming SQL based pipeline, and exposed the query result as WebSocket or http stream API endpoint. |
| 4 | + |
| 5 | +- FastAPI |
| 6 | +- Proton |
| 7 | + |
| 8 | + |
| 9 | +## quick start |
| 10 | + |
| 11 | +to run this sample, you need `python` |
| 12 | + |
| 13 | +1. run `pip install -r requirements.txt` to install all the dependencies |
| 14 | +2. run `docker run -d -p 8463:8463 --pull always --name proton ghcr.io/timeplus-io/proton:latest` to start a proton docker instance, with port `8643` exposed, the python driver will use this port |
| 15 | +3. configure your streaming pipeline in [config.yaml](config.yaml) |
| 16 | +4. run `uvicorn server.main:app --port 5001 --host 0.0.0.0 --http h11 --reload` to start the server which will be hosting the sql pipeline using FastAPI and expose WebSocket and HTTP stream api endpoint |
| 17 | +5. run `wscat -c ws://localhost:5001/queries/<pipeline_name>` to consume the WebSocket streaming result from the pipeline |
| 18 | +6. run `curl http://localhost:5001/queries/<pipeline_name>` to consume the HTTP stream result from the pipeline |
| 19 | + |
| 20 | +## pipeline description |
| 21 | + |
| 22 | +you can configure your pipeline in format of yaml, here is the sample pipeline for your reference. |
| 23 | + |
| 24 | +```yaml |
| 25 | +pipelines: |
| 26 | + - name: pipeline1 |
| 27 | + sqls: |
| 28 | + - | |
| 29 | + CREATE RANDOM STREAM IF NOT EXISTS devices( |
| 30 | + device string default 'device'||to_string(rand()%4), |
| 31 | + temperature float default rand()%1000/10 |
| 32 | + ) |
| 33 | + - | |
| 34 | + SELECT * FROM devices |
| 35 | + - name: pipeline2 |
| 36 | + sqls: |
| 37 | + - | |
| 38 | + CREATE RANDOM STREAM IF NOT EXISTS devices( |
| 39 | + device string default 'device'||to_string(rand()%4), |
| 40 | + temperature float default rand()%1000/10 |
| 41 | + ) |
| 42 | + - | |
| 43 | + SELECT |
| 44 | + window_start, |
| 45 | + count(*) as count |
| 46 | + FROM |
| 47 | + tumble(devices, 1s) |
| 48 | + GROUP BY |
| 49 | + window_start |
| 50 | + - name: pipeline3 |
| 51 | + sqls: |
| 52 | + - | |
| 53 | + SELECT 1 |
| 54 | +``` |
| 55 | +
|
| 56 | +1. you can define multiple pipelines |
| 57 | +2. each pipeline has a unqiue name, this name will be used in the url of WebScoket or HTTP stream endpoint to identify which pipeline to consume |
| 58 | +3. pipeline execution is triggerred by API call |
| 59 | +4. each pipeline contains a list of SQL queries to run, you can call DDL create streams, external streams, views, materialized views and query in your pipeline, the last query and only the last query should be the query that return streaming or historical query result. |
| 60 | +
|
| 61 | +in the above case, we defined 3 pipelines |
| 62 | +
|
| 63 | +- pipeline1 : create a random stream -> run a streaming query to tail all data on that stream |
| 64 | +- pipeline2 : create a random stream -> run a tumble window to caculate the count of event in each window |
| 65 | +- pipeline3 : run a historical query `select 1` which is usually used to quick test if the SQL is working or not |
| 66 | + |
| 67 | + |
| 68 | +## streaming result |
| 69 | + |
| 70 | +In this sample, all query results are returned in lines of json object. for example: |
| 71 | + |
| 72 | + |
| 73 | +Websocket: |
| 74 | + |
| 75 | +```shell |
| 76 | +wscat -c ws://localhost:5001/queries/pipeline1 |
| 77 | +Connected (press CTRL+C to quit) |
| 78 | +< {'device': 'device1', 'temperature': 16.899999618530273, '_tp_time': '2024-01-23 02:50:37.798000+00:00'} |
| 79 | +< {'device': 'device2', 'temperature': 55.0, '_tp_time': '2024-01-23 02:50:37.798000+00:00'} |
| 80 | +< {'device': 'device2', 'temperature': 33.0, '_tp_time': '2024-01-23 02:50:37.798000+00:00'} |
| 81 | +< {'device': 'device3', 'temperature': 59.900001525878906, '_tp_time': '2024-01-23 02:50:37.798000+00:00'} |
| 82 | +< {'device': 'device0', 'temperature': 92.0, '_tp_time': '2024-01-23 02:50:37.798000+00:00'} |
| 83 | +< {'device': 'device1', 'temperature': 11.699999809265137, '_tp_time': '2024-01-23 02:50:37.803000+00:00'} |
| 84 | +< {'device': 'device2', 'temperature': 23.399999618530273, '_tp_time': '2024-01-23 02:50:37.803000+00:00'} |
| 85 | +< {'device': 'device3', 'temperature': 37.900001525878906, '_tp_time': '2024-01-23 02:50:37.803000+00:00'} |
| 86 | +< {'device': 'device1', 'temperature': 77.69999694824219, '_tp_time': '2024-01-23 02:50:37.803000+00:00'} |
| 87 | +< {'device': 'device3', 'temperature': 13.899999618530273, '_tp_time': '2024-01-23 02:50:37.803000+00:00'} |
| 88 | +< {'device': 'device2', 'temperature': 84.19999694824219, '_tp_time': '2024-01-23 02:50:37.808000+00:00'} |
| 89 | +
|
| 90 | +``` |
| 91 | + |
| 92 | +HTTP stream: |
| 93 | + |
| 94 | +```shell |
| 95 | +curl http://localhost:5001/queries/pipeline2 |
| 96 | +{"window_start": "2024-01-23 02:52:07+00:00", "count": 580} |
| 97 | +{"window_start": "2024-01-23 02:52:08+00:00", "count": 1000} |
| 98 | +{"window_start": "2024-01-23 02:52:09+00:00", "count": 1000} |
| 99 | +``` |
0 commit comments