diff --git a/examples/projects/option-greeks/querying.py b/examples/projects/option-greeks/querying.py new file mode 100644 index 00000000..539ba50b --- /dev/null +++ b/examples/projects/option-greeks/querying.py @@ -0,0 +1,82 @@ +# Copyright © 2024 Pathway + + +import logging +import os +import signal +import traceback +from threading import Thread + +import pandas as pd +import uvicorn +from fastapi import FastAPI, HTTPException, Response +from pydantic import BaseModel + +import pathway as pw + +app = FastAPI() + + +class IntegratedTable: + data: dict + schema: pw.Schema + + def __init__(self, data, schema): + self.data = data + self.schema = schema + + def serialize(self): + return pd.DataFrame.from_records( + list(self.data.values()), + columns=list(self.schema.keys()) + ["id"], + index="id", + ).to_json(default_handler=str) + + +registered_tables: dict[str, IntegratedTable] = {} + + +class RegisteredTablesResponse(BaseModel): + tables: dict[str, list] + + +@app.get("/get_table") +async def get_table(alias: str): + if alias in registered_tables: + return Response( + registered_tables[alias].serialize(), media_type="application/json" + ) + return + else: + raise HTTPException(status_code=404, detail=f"Table `{alias}` not found") + + +def register_table(self: pw.Table, alias: str, *, short_pointers=True): + integrated = {} + + def update(key, row, time, is_addition): + row["id"] = key + if is_addition: + integrated[key] = tuple(row.values()) + else: + del integrated[key] + + pw.io.subscribe(self, on_change=update) # todo: support time end + registered_tables[alias] = IntegratedTable(data=integrated, schema=self.schema) + + +def run_with_querying(*, pathway_kwargs={}, uvicorn_kwargs={}): + def _run(): + try: + pw.run(**pathway_kwargs) + except Exception: + logging.error(traceback.format_exc()) + signal.raise_signal(signal.SIGINT) + + if os.environ.get("PATHWAY_PROCESS_ID", "0") == "0": + t = Thread(target=_run) + t.start() + uvicorn.run(app, loop="asyncio", **uvicorn_kwargs) + t.join() + else: + _run() diff --git a/examples/projects/option-greeks/requirements.txt b/examples/projects/option-greeks/requirements.txt index 0bacc6cb..e4009f62 100644 --- a/examples/projects/option-greeks/requirements.txt +++ b/examples/projects/option-greeks/requirements.txt @@ -3,3 +3,7 @@ pandas scipy pathway python-dotenv +fastapi +pydantic +streamlit +uvicorn diff --git a/examples/projects/option-greeks/streamlit_ux.py b/examples/projects/option-greeks/streamlit_ux.py index 42c36e15..f70c874c 100644 --- a/examples/projects/option-greeks/streamlit_ux.py +++ b/examples/projects/option-greeks/streamlit_ux.py @@ -5,11 +5,11 @@ import requests import streamlit as st from dotenv import load_dotenv +from querying import register_table, run_with_querying from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import pathway as pw -from common.pathway.querying import register_table, run_with_querying def send_table_to_web(port: int, table: pw.Table, alias: str):