Skip to content

Update to use bw 0.18 #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/bytewax/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ USER appuser
COPY . .

# Run the application.
CMD python -m bytewax.run "hackernews:run_hn_flow(0)"
CMD python -m bytewax.run hackernews.py
58 changes: 41 additions & 17 deletions example/bytewax/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,67 @@ Simply run `docker compose up` in this folder and it will start both Proton and
python3.10 -m venv py310-env
source py310-env/bin/activate
#git clone and cd to this proton-python-driver/example/bytewax folder
pip install bytewax
pip install bytewax==0.18
pip install requests
pip install proton-driver

python -m bytewax.run "hackernews:run_hn_flow(0)"
python -m bytewax.run hackernews.py
```
It will load ~100 items every 15 second and send the data to Proton.
It will load new items every 15 second and send the data to Proton.

## How it works

```python
flow.output("out", ProtonOutput(HOST, "hn"))
op.output("stories-out", story_stream, ProtonSink("hn_stories", os.environ["PROTON_HOST"]))
```
`hn` is an example stream name. The `ProtonOutput` will create the stream if it doesn't exist
`hn` is an example stream name. The `ProtonSink` will create the stream if it doesn't exist
```python
class _ProtonSink(StatelessSink):
class _ProtonSinkPartition(StatelessSinkPartition):
def __init__(self, stream: str, host: str):
self.client=client.Client(host=HOST, port=8463)
self.client=client.Client(host=host, port=8463)
self.stream=stream
sql=f"CREATE STREAM IF NOT EXISTS `{stream}` (raw string)"
logger.debug(sql)
self.client.execute(sql)
```
and batch insert data
```python
def write_batch(self, items):
rows=[]
for item in items:
rows.append([item]) # single column in each row
sql = f"INSERT INTO `{self.stream}` (raw) VALUES"
self.client.execute(sql,rows)
def write_batch(self, items):
rows=[]
for item in items:
rows.append([item]) # single column in each row
sql = f"INSERT INTO `{self.stream}` (raw) VALUES"
# logger.debug(f"inserting data {sql}")
self.client.execute(sql,rows)
```

```python
class ProtonOutput(DynamicOutput):
class ProtonSink(DynamicSink):
def __init__(self, stream: str, host: str):
self.stream=stream
self.host=host
self.stream = stream
self.host = host if host is not None and host != "" else "127.0.0.1"


def build(self, worker_index, worker_count):
return _ProtonSink(self.stream, self.host)
"""See ABC docstring."""
return _ProtonSinkPartition(self.stream, self.host)
```

### Querying and visualizing with Grafana

First, you will need to follow the setup instructions listed [here](https://github.com/timeplus-io/proton/blob/develop/examples/grafana/README.md). Once setup you can

start grafana

open grafana (http://localhost:3000) in your browser and add the proton data source.

in the explore tab, run the query below as a live query.

```
select
raw:id as story_id,
raw:url as url,
raw:title as title,
raw:by as author
from hn_stories
```
6 changes: 5 additions & 1 deletion example/bytewax/compose.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
services:
proton:
image: ghcr.io/timeplus-io/proton:latest
hn_datagen:
pull_policy: always
ports:
- 8463:8463
- 3218:3218
hn_stream:
build:
context: .
image: timeplus/hackernews_bytewax:latest
Expand Down
138 changes: 72 additions & 66 deletions example/bytewax/hackernews.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,86 @@
import requests
from datetime import datetime, timedelta
import time, json, os
from typing import Any, Optional
import logging
from datetime import timedelta
from typing import Optional, Tuple
import os
import json

from bytewax.connectors.periodic import SimplePollingInput
import requests
from bytewax import operators as op
from bytewax.dataflow import Dataflow
from bytewax.inputs import SimplePollingSource

import logging

from proton import ProtonOutput
from proton import ProtonSink

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# logger.setLevel(logging.DEBUG)

class HNInput(SimplePollingInput):
def __init__(self, interval: timedelta, align_to: Optional[datetime] = None, init_item: Optional[int] = None):
super().__init__(interval, align_to)
'''
By default, only get the recent events
'''
if not init_item or init_item == 0:
self.max_id = int(requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json()*0.999998)
else:
self.max_id = init_item
logger.info(f"received starting id: {init_item}")



class HNSource(SimplePollingSource):
def next_item(self):
'''
Get all the items from hacker news API between the last max id and the current max id.
'''
new_max_id = requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json()
logger.info(f"current id: {self.max_id}, new id: {new_max_id}. {new_max_id-self.max_id} items to fetch")
ids = [int(i) for i in range(self.max_id, new_max_id)]
self.max_id = new_max_id
logger.debug(ids)
return ids

def download_metadata(hn_id):
return (
"GLOBAL_ID",
requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json(),
)


def get_id_stream(old_max_id, new_max_id) -> Tuple[str,list]:
if old_max_id is None:
# Get the last 10 items on the first run.
old_max_id = new_max_id - 10
return (new_max_id, range(old_max_id, new_max_id))


def download_metadata(hn_id) -> Optional[Tuple[str, dict]]:
# Given an hacker news id returned from the api, fetch metadata
logger.info(f"Fetching https://hacker-news.firebaseio.com/v0/item/{hn_id}.json")
req = requests.get(
# Try 3 times, waiting more and more, or give up
data = requests.get(
f"https://hacker-news.firebaseio.com/v0/item/{hn_id}.json"
)
if not req.json():
logger.warning(f"error getting payload from item {hn_id} trying again")
time.sleep(0.5)
return download_metadata(hn_id)
return req.json()

def recurse_tree(metadata):
).json()

if data is None:
logger.warning(f"Couldn't fetch item {hn_id}, skipping")
return None
return (str(hn_id), data)


def recurse_tree(metadata, og_metadata=None) -> any:
if not og_metadata:
og_metadata = metadata
try:
parent_id = metadata["parent"]
parent_metadata = download_metadata(parent_id)
return recurse_tree(parent_metadata)
return recurse_tree(parent_metadata[1], og_metadata)
except KeyError:
return (metadata["id"], {**metadata, "key_id": metadata["id"]})

def key_on_parent(metadata: dict) -> tuple:
key, metadata = recurse_tree(metadata)
return json.dumps(metadata, indent=4, sort_keys=True)

def run_hn_flow(init_item):
flow = Dataflow()
flow.input("in", HNInput(timedelta(seconds=15), None, init_item)) # skip the align_to argument
flow.flat_map(lambda x: x)
# If you run this dataflow with multiple workers, downloads in
# the next `map` will be parallelized thanks to .redistribute()
flow.redistribute()
flow.map(download_metadata)
flow.inspect(logger.debug)

# We want to keep related data together so let's build a
# traversal function to get the ultimate parent
flow.map(key_on_parent)

flow.output("out", ProtonOutput("hn",os.environ["PROTON_HOST"]))
return flow
return (metadata["id"],
{
**og_metadata,
"root_id":metadata["id"]
}
)


def key_on_parent(key__metadata) -> tuple:
key, metadata = recurse_tree(key__metadata[1])
return (str(key), metadata)


def format(id__metadata):
id, metadata = id__metadata
return json.dumps(metadata)

flow = Dataflow("hn_scraper")
max_id = op.input("in", flow, HNSource(timedelta(seconds=15)))
id_stream = op.stateful_map("range", max_id, lambda: None, get_id_stream).then(
op.flat_map, "strip_key_flatten", lambda key_ids: key_ids[1]).then(
op.redistribute, "redist")
id_stream = op.filter_map("meta_download", id_stream, download_metadata)
split_stream = op.branch("split_comments", id_stream, lambda item: item[1]["type"] == "story")
story_stream = split_stream.trues
story_stream = op.map("format_stories", story_stream, format)
comment_stream = split_stream.falses
comment_stream = op.map("key_on_parent", comment_stream, key_on_parent)
comment_stream = op.map("format_comments", comment_stream, format)
op.inspect("stories", story_stream)
op.inspect("comments", comment_stream)
op.output("stories-out", story_stream, ProtonSink("hn_stories", os.environ["PROTON_HOST"]))
op.output("comments-out", comment_stream, ProtonSink("hn_comments", os.environ["PROTON_HOST"]))
15 changes: 8 additions & 7 deletions example/bytewax/proton.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
"""Output to Timeplus Proton."""
from bytewax.outputs import DynamicOutput, StatelessSink
from bytewax.outputs import DynamicSink, StatelessSinkPartition
from proton_driver import client
import logging

__all__ = [
"ProtonOutput",
"ProtonSink",
]
logger = logging.getLogger(__name__)
# logger.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)

class _ProtonSink(StatelessSink):
class _ProtonSinkPartition(StatelessSinkPartition):
def __init__(self, stream: str, host: str):
self.client=client.Client(host=host, port=8463)
self.stream=stream
Expand All @@ -18,14 +18,15 @@ def __init__(self, stream: str, host: str):
self.client.execute(sql)

def write_batch(self, items):
logger.debug(f"inserting data {items}")
rows=[]
for item in items:
rows.append([item]) # single column in each row
sql = f"INSERT INTO `{self.stream}` (raw) VALUES"
# logger.debug(f"inserting data {sql}")
logger.debug(f"inserting data {sql}")
self.client.execute(sql,rows)

class ProtonOutput(DynamicOutput):
class ProtonSink(DynamicSink):
def __init__(self, stream: str, host: str):
self.stream = stream
self.host = host if host is not None and host != "" else "127.0.0.1"
Expand All @@ -44,4 +45,4 @@ def __init__(self, stream: str, host: str):

def build(self, worker_index, worker_count):
"""See ABC docstring."""
return _ProtonSink(self.stream, self.host)
return _ProtonSinkPartition(self.stream, self.host)