Skip to content

Commit

Permalink
Expose internal code in databento example (#7463)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 30b1756cbeac8f7a4d5510ee13821d949269718a
  • Loading branch information
szymondudycz authored and Manul from Pathway committed Oct 11, 2024
1 parent 75682ec commit 95899b6
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 1 deletion.
82 changes: 82 additions & 0 deletions examples/projects/option-greeks/querying.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright © 2024 Pathway


import logging
import os
import signal
import traceback
from threading import Thread

import pandas as pd
import uvicorn
from fastapi import FastAPI, HTTPException, Response
from pydantic import BaseModel

import pathway as pw

app = FastAPI()


class IntegratedTable:
data: dict
schema: pw.Schema

def __init__(self, data, schema):
self.data = data
self.schema = schema

def serialize(self):
return pd.DataFrame.from_records(
list(self.data.values()),
columns=list(self.schema.keys()) + ["id"],
index="id",
).to_json(default_handler=str)


registered_tables: dict[str, IntegratedTable] = {}


class RegisteredTablesResponse(BaseModel):
tables: dict[str, list]


@app.get("/get_table")
async def get_table(alias: str):
if alias in registered_tables:
return Response(
registered_tables[alias].serialize(), media_type="application/json"
)
return
else:
raise HTTPException(status_code=404, detail=f"Table `{alias}` not found")


def register_table(self: pw.Table, alias: str, *, short_pointers=True):
integrated = {}

def update(key, row, time, is_addition):
row["id"] = key
if is_addition:
integrated[key] = tuple(row.values())
else:
del integrated[key]

pw.io.subscribe(self, on_change=update) # todo: support time end
registered_tables[alias] = IntegratedTable(data=integrated, schema=self.schema)


def run_with_querying(*, pathway_kwargs={}, uvicorn_kwargs={}):
def _run():
try:
pw.run(**pathway_kwargs)
except Exception:
logging.error(traceback.format_exc())
signal.raise_signal(signal.SIGINT)

if os.environ.get("PATHWAY_PROCESS_ID", "0") == "0":
t = Thread(target=_run)
t.start()
uvicorn.run(app, loop="asyncio", **uvicorn_kwargs)
t.join()
else:
_run()
4 changes: 4 additions & 0 deletions examples/projects/option-greeks/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ pandas
scipy
pathway
python-dotenv
fastapi
pydantic
streamlit
uvicorn
2 changes: 1 addition & 1 deletion examples/projects/option-greeks/streamlit_ux.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import requests
import streamlit as st
from dotenv import load_dotenv
from querying import register_table, run_with_querying
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

import pathway as pw
from common.pathway.querying import register_table, run_with_querying


def send_table_to_web(port: int, table: pw.Table, alias: str):
Expand Down

0 comments on commit 95899b6

Please sign in to comment.