Skip to content

Commit 20dd99d

Browse files
authored
Update to use bw 0.18 (#40)
* update bytewax 0.18 * readme fixes
1 parent c7a1dd6 commit 20dd99d

File tree

5 files changed

+127
-92
lines changed

5 files changed

+127
-92
lines changed

example/bytewax/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,4 @@ USER appuser
3838
COPY . .
3939

4040
# Run the application.
41-
CMD python -m bytewax.run "hackernews:run_hn_flow(0)"
41+
CMD python -m bytewax.run hackernews.py

example/bytewax/README.md

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,43 +17,67 @@ Simply run `docker compose up` in this folder and it will start both Proton and
1717
python3.10 -m venv py310-env
1818
source py310-env/bin/activate
1919
#git clone and cd to this proton-python-driver/example/bytewax folder
20-
pip install bytewax
20+
pip install bytewax==0.18
2121
pip install requests
2222
pip install proton-driver
2323

24-
python -m bytewax.run "hackernews:run_hn_flow(0)"
24+
python -m bytewax.run hackernews.py
2525
```
26-
It will load ~100 items every 15 second and send the data to Proton.
26+
It will load new items every 15 second and send the data to Proton.
2727

2828
## How it works
2929

3030
```python
31-
flow.output("out", ProtonOutput(HOST, "hn"))
31+
op.output("stories-out", story_stream, ProtonSink("hn_stories", os.environ["PROTON_HOST"]))
3232
```
33-
`hn` is an example stream name. The `ProtonOutput` will create the stream if it doesn't exist
33+
`hn` is an example stream name. The `ProtonSink` will create the stream if it doesn't exist
3434
```python
35-
class _ProtonSink(StatelessSink):
35+
class _ProtonSinkPartition(StatelessSinkPartition):
3636
def __init__(self, stream: str, host: str):
37-
self.client=client.Client(host=HOST, port=8463)
37+
self.client=client.Client(host=host, port=8463)
3838
self.stream=stream
3939
sql=f"CREATE STREAM IF NOT EXISTS `{stream}` (raw string)"
40+
logger.debug(sql)
4041
self.client.execute(sql)
4142
```
4243
and batch insert data
4344
```python
44-
def write_batch(self, items):
45-
rows=[]
46-
for item in items:
47-
rows.append([item]) # single column in each row
48-
sql = f"INSERT INTO `{self.stream}` (raw) VALUES"
49-
self.client.execute(sql,rows)
45+
def write_batch(self, items):
46+
rows=[]
47+
for item in items:
48+
rows.append([item]) # single column in each row
49+
sql = f"INSERT INTO `{self.stream}` (raw) VALUES"
50+
# logger.debug(f"inserting data {sql}")
51+
self.client.execute(sql,rows)
5052
```
5153

5254
```python
53-
class ProtonOutput(DynamicOutput):
55+
class ProtonSink(DynamicSink):
5456
def __init__(self, stream: str, host: str):
55-
self.stream=stream
56-
self.host=host
57+
self.stream = stream
58+
self.host = host if host is not None and host != "" else "127.0.0.1"
59+
60+
5761
def build(self, worker_index, worker_count):
58-
return _ProtonSink(self.stream, self.host)
62+
"""See ABC docstring."""
63+
return _ProtonSinkPartition(self.stream, self.host)
64+
```
65+
66+
### Querying and visualizing with Grafana
67+
68+
First, you will need to follow the setup instructions listed [here](https://github.com/timeplus-io/proton/blob/develop/examples/grafana/README.md). Once setup you can
69+
70+
start grafana
71+
72+
open grafana (http://localhost:3000) in your browser and add the proton data source.
73+
74+
in the explore tab, run the query below as a live query.
75+
76+
```
77+
select
78+
raw:id as story_id,
79+
raw:url as url,
80+
raw:title as title,
81+
raw:by as author
82+
from hn_stories
5983
```

example/bytewax/compose.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
services:
22
proton:
33
image: ghcr.io/timeplus-io/proton:latest
4-
hn_datagen:
4+
pull_policy: always
5+
ports:
6+
- 8463:8463
7+
- 3218:3218
8+
hn_stream:
59
build:
610
context: .
711
image: timeplus/hackernews_bytewax:latest

example/bytewax/hackernews.py

Lines changed: 72 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,80 +1,86 @@
1-
import requests
2-
from datetime import datetime, timedelta
3-
import time, json, os
4-
from typing import Any, Optional
1+
import logging
2+
from datetime import timedelta
3+
from typing import Optional, Tuple
4+
import os
5+
import json
56

6-
from bytewax.connectors.periodic import SimplePollingInput
7+
import requests
8+
from bytewax import operators as op
79
from bytewax.dataflow import Dataflow
10+
from bytewax.inputs import SimplePollingSource
811

9-
import logging
10-
11-
from proton import ProtonOutput
12+
from proton import ProtonSink
1213

1314
logging.basicConfig(level=logging.INFO)
1415
logger = logging.getLogger(__name__)
15-
# logger.setLevel(logging.DEBUG)
16-
17-
class HNInput(SimplePollingInput):
18-
def __init__(self, interval: timedelta, align_to: Optional[datetime] = None, init_item: Optional[int] = None):
19-
super().__init__(interval, align_to)
20-
'''
21-
By default, only get the recent events
22-
'''
23-
if not init_item or init_item == 0:
24-
self.max_id = int(requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json()*0.999998)
25-
else:
26-
self.max_id = init_item
27-
logger.info(f"received starting id: {init_item}")
28-
2916

17+
18+
class HNSource(SimplePollingSource):
3019
def next_item(self):
31-
'''
32-
Get all the items from hacker news API between the last max id and the current max id.
33-
'''
34-
new_max_id = requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json()
35-
logger.info(f"current id: {self.max_id}, new id: {new_max_id}. {new_max_id-self.max_id} items to fetch")
36-
ids = [int(i) for i in range(self.max_id, new_max_id)]
37-
self.max_id = new_max_id
38-
logger.debug(ids)
39-
return ids
40-
41-
def download_metadata(hn_id):
20+
return (
21+
"GLOBAL_ID",
22+
requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json(),
23+
)
24+
25+
26+
def get_id_stream(old_max_id, new_max_id) -> Tuple[str,list]:
27+
if old_max_id is None:
28+
# Get the last 10 items on the first run.
29+
old_max_id = new_max_id - 10
30+
return (new_max_id, range(old_max_id, new_max_id))
31+
32+
33+
def download_metadata(hn_id) -> Optional[Tuple[str, dict]]:
4234
# Given an hacker news id returned from the api, fetch metadata
43-
logger.info(f"Fetching https://hacker-news.firebaseio.com/v0/item/{hn_id}.json")
44-
req = requests.get(
35+
# Try 3 times, waiting more and more, or give up
36+
data = requests.get(
4537
f"https://hacker-news.firebaseio.com/v0/item/{hn_id}.json"
46-
)
47-
if not req.json():
48-
logger.warning(f"error getting payload from item {hn_id} trying again")
49-
time.sleep(0.5)
50-
return download_metadata(hn_id)
51-
return req.json()
52-
53-
def recurse_tree(metadata):
38+
).json()
39+
40+
if data is None:
41+
logger.warning(f"Couldn't fetch item {hn_id}, skipping")
42+
return None
43+
return (str(hn_id), data)
44+
45+
46+
def recurse_tree(metadata, og_metadata=None) -> any:
47+
if not og_metadata:
48+
og_metadata = metadata
5449
try:
5550
parent_id = metadata["parent"]
5651
parent_metadata = download_metadata(parent_id)
57-
return recurse_tree(parent_metadata)
52+
return recurse_tree(parent_metadata[1], og_metadata)
5853
except KeyError:
59-
return (metadata["id"], {**metadata, "key_id": metadata["id"]})
60-
61-
def key_on_parent(metadata: dict) -> tuple:
62-
key, metadata = recurse_tree(metadata)
63-
return json.dumps(metadata, indent=4, sort_keys=True)
64-
65-
def run_hn_flow(init_item):
66-
flow = Dataflow()
67-
flow.input("in", HNInput(timedelta(seconds=15), None, init_item)) # skip the align_to argument
68-
flow.flat_map(lambda x: x)
69-
# If you run this dataflow with multiple workers, downloads in
70-
# the next `map` will be parallelized thanks to .redistribute()
71-
flow.redistribute()
72-
flow.map(download_metadata)
73-
flow.inspect(logger.debug)
74-
75-
# We want to keep related data together so let's build a
76-
# traversal function to get the ultimate parent
77-
flow.map(key_on_parent)
78-
79-
flow.output("out", ProtonOutput("hn",os.environ["PROTON_HOST"]))
80-
return flow
54+
return (metadata["id"],
55+
{
56+
**og_metadata,
57+
"root_id":metadata["id"]
58+
}
59+
)
60+
61+
62+
def key_on_parent(key__metadata) -> tuple:
63+
key, metadata = recurse_tree(key__metadata[1])
64+
return (str(key), metadata)
65+
66+
67+
def format(id__metadata):
68+
id, metadata = id__metadata
69+
return json.dumps(metadata)
70+
71+
flow = Dataflow("hn_scraper")
72+
max_id = op.input("in", flow, HNSource(timedelta(seconds=15)))
73+
id_stream = op.stateful_map("range", max_id, lambda: None, get_id_stream).then(
74+
op.flat_map, "strip_key_flatten", lambda key_ids: key_ids[1]).then(
75+
op.redistribute, "redist")
76+
id_stream = op.filter_map("meta_download", id_stream, download_metadata)
77+
split_stream = op.branch("split_comments", id_stream, lambda item: item[1]["type"] == "story")
78+
story_stream = split_stream.trues
79+
story_stream = op.map("format_stories", story_stream, format)
80+
comment_stream = split_stream.falses
81+
comment_stream = op.map("key_on_parent", comment_stream, key_on_parent)
82+
comment_stream = op.map("format_comments", comment_stream, format)
83+
op.inspect("stories", story_stream)
84+
op.inspect("comments", comment_stream)
85+
op.output("stories-out", story_stream, ProtonSink("hn_stories", os.environ["PROTON_HOST"]))
86+
op.output("comments-out", comment_stream, ProtonSink("hn_comments", os.environ["PROTON_HOST"]))

example/bytewax/proton.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
"""Output to Timeplus Proton."""
2-
from bytewax.outputs import DynamicOutput, StatelessSink
2+
from bytewax.outputs import DynamicSink, StatelessSinkPartition
33
from proton_driver import client
44
import logging
55

66
__all__ = [
7-
"ProtonOutput",
7+
"ProtonSink",
88
]
99
logger = logging.getLogger(__name__)
10-
# logger.setLevel(logging.DEBUG)
10+
logger.setLevel(logging.DEBUG)
1111

12-
class _ProtonSink(StatelessSink):
12+
class _ProtonSinkPartition(StatelessSinkPartition):
1313
def __init__(self, stream: str, host: str):
1414
self.client=client.Client(host=host, port=8463)
1515
self.stream=stream
@@ -18,14 +18,15 @@ def __init__(self, stream: str, host: str):
1818
self.client.execute(sql)
1919

2020
def write_batch(self, items):
21+
logger.debug(f"inserting data {items}")
2122
rows=[]
2223
for item in items:
2324
rows.append([item]) # single column in each row
2425
sql = f"INSERT INTO `{self.stream}` (raw) VALUES"
25-
# logger.debug(f"inserting data {sql}")
26+
logger.debug(f"inserting data {sql}")
2627
self.client.execute(sql,rows)
2728

28-
class ProtonOutput(DynamicOutput):
29+
class ProtonSink(DynamicSink):
2930
def __init__(self, stream: str, host: str):
3031
self.stream = stream
3132
self.host = host if host is not None and host != "" else "127.0.0.1"
@@ -44,4 +45,4 @@ def __init__(self, stream: str, host: str):
4445

4546
def build(self, worker_index, worker_count):
4647
"""See ABC docstring."""
47-
return _ProtonSink(self.stream, self.host)
48+
return _ProtonSinkPartition(self.stream, self.host)

0 commit comments

Comments
 (0)