Skip to content

Commit

Permalink
Fix #1191: Lake Merge Process Step-2 - GQL and GQL Dependencies (#1192)
Browse files Browse the repository at this point in the history
* the legacy table class and tests are moved

* Step-1: Utils and essentials

* test and black fixes

* black and lint fixes

* test collecting error

* test fixes

* lint and black fixes

* Step-2: Moving GQL

* fixes on cli and dependent classes

* fixes on tests and removed etl

* black and test issues

* etl imports temporarily remove

* Fix a couple of tests.

* Fix linters.

* issue-1191 - legacy test fixed

* linter fix

* fix linter

* issue-1191: fix conflict

* issue-1191: pylint fix

* issue-1191: black fix

* issue-1191

* issue-1191: CLI command tips

* issue-1191: CLI command tip fix

* issue-1191: prediction command error message

* lazy formatting fix

* issue-1191: CLI hints

* issue-1191: assertion fix

* README fixes

* CLI arguments are updated

---------

Co-authored-by: Calina Cenan <calina@cenan.net>
  • Loading branch information
kdetry and calina-c authored Jun 17, 2024
1 parent 9c57dea commit 993ad88
Show file tree
Hide file tree
Showing 64 changed files with 2,641 additions and 996 deletions.
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ ignore=
build/,
logs/,
venv,
new_venv,
*/__pycache__,
*/*/__pycache__,
.pytest_cache,
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ OPF-run bots & higher-level tools:
- `trueval` - report true values to contract
- `dfbuyer` - buy feeds on behalf of Predictoor DF
- `publisher` - publish pdr data feeds
- `analytics` - analytics tools
- `lake` - data lake and analytics tools
- `deployer` - deployer tool
- `accuracy` - calculates % correct, for display in predictoor.ai webapp

Mid-level building blocks:

- `cli` - implementation of CLI
- `ppss` - implements settings
- `lake` - data lake and data pipeline
- `ohlcv` - financial data pipeline
- `subgraph` - blockchain queries, complements lake
- `aimodel` - AI/ML modeling engine
- `accuracy` - to report % correct in webapp
Expand Down
2 changes: 1 addition & 1 deletion READMEs/predictoor.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pdr sim my_ppss.yaml

What the engine does does:
1. Set simulation parameters.
1. Grab historical price data from exchanges and stores in `parquet_data/` dir. It re-uses any previously saved data.
1. Grab historical price data from exchanges and stores in `lake_data/` dir. It re-uses any previously saved data.
1. Run through many 5min epochs. At each epoch:
- Build a model
- Predict
Expand Down
2 changes: 1 addition & 1 deletion READMEs/trader.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pdr sim my_ppss.yaml

What the engine does does:
1. Set simulation parameters.
1. Grab historical price data from exchanges and stores in `parquet_data/` dir. It re-uses any previously saved data.
1. Grab historical price data from exchanges and stores in `lake_data/` dir. It re-uses any previously saved data.
1. Run through many 5min epochs. At each epoch:
- Build a model
- Predict
Expand Down
5 changes: 4 additions & 1 deletion pdr_backend/accuracy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
get_all_contract_ids_by_owner,
ContractIdAndSPE,
)
from pdr_backend.subgraph.subgraph_slot import fetch_slots_for_all_assets, PredictSlot
from pdr_backend.subgraph.legacy.subgraph_slot import (
fetch_slots_for_all_assets,
PredictSlot,
)
from pdr_backend.util.time_types import UnixTimeS

app = Flask(__name__)
Expand Down
2 changes: 1 addition & 1 deletion pdr_backend/accuracy/test/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
aggregate_statistics,
calculate_statistics_for_all_assets,
)
from pdr_backend.subgraph.subgraph_slot import PredictSlot
from pdr_backend.subgraph.legacy.subgraph_slot import PredictSlot
from pdr_backend.util.time_types import UnixTimeS

# Sample data for tests
Expand Down
130 changes: 68 additions & 62 deletions pdr_backend/analytics/get_predictions_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
from typing import List

from enforce_typing import enforce_types
from pdr_backend.ppss.ppss import PPSS
from pdr_backend.lake.gql_data_factory import GQLDataFactory

from pdr_backend.analytics.predictoor_stats import (
get_feed_summary_stats,
get_predictoor_summary_stats,
Expand All @@ -13,67 +12,52 @@
plot_traction_cum_sum_statistics,
plot_traction_daily_statistics,
)
from pdr_backend.lake.duckdb_data_store import DuckDBDataStore
from pdr_backend.lake.table import NamedTable
from pdr_backend.ppss.ppss import PPSS
from pdr_backend.util.time_types import UnixTimeMs

logger = logging.getLogger("get_predictions_info")


class PredFilter:
def __init__(self, ppss):
gql_data_factory = GQLDataFactory(ppss)
gql_tables = gql_data_factory.get_gql_tables()

predictions_df = gql_tables["pdr_predictions"].df
assert (
predictions_df is not None and len(predictions_df) > 0
), "Lake has no predictions."

self.predictions_df = predictions_df

def filter_timestamp(self, start_timestr, end_timestr):
predictions_df = self.predictions_df

predictions_df = predictions_df.filter(
(predictions_df["timestamp"] >= UnixTimeMs.from_timestr(start_timestr))
& (predictions_df["timestamp"] <= UnixTimeMs.from_timestr(end_timestr))
)

self.predictions_df = predictions_df

def filter_feed_addrs(self, feed_addrs):
if len(feed_addrs) <= 0:
return
feed_addrs = [f.lower() for f in feed_addrs]

predictions_df = self.predictions_df
predictions_df = predictions_df.filter(
predictions_df["ID"]
.map_elements(lambda x: x.split("-")[0].lower(), return_dtype=str)
.is_in(feed_addrs)
)
self.predictions_df = predictions_df

def filter_user_addrs(self, pdr_addrs):
if len(pdr_addrs) <= 0:
return
@enforce_types
def _address_list_to_str(addresses: List[str]) -> str:
return "(" + ", ".join([f"'{f.lower()}'" for f in addresses]) + ")"

pdr_addrs = [f.lower() for f in pdr_addrs]

predictions_df = self.predictions_df
predictions_df = predictions_df.filter(predictions_df["user"].is_in(pdr_addrs))
self.predictions_df = predictions_df
@enforce_types
def _checks_for_empty_df(df, table_name: str):
assert df is not None, f"No table found: {table_name}"
assert len(df) > 0, "No records to summarize. Please adjust params."


@enforce_types
def get_predictions_info_main(
ppss: PPSS, start_timestr: str, end_timestr: str, feed_addrs: List[str]
):
pred_filter = PredFilter(ppss)
pred_filter.filter_feed_addrs(feed_addrs)
pred_filter.filter_timestamp(start_timestr, end_timestr)
predictions_df = pred_filter.predictions_df
logger.info(
"get_predictions_info_main_ppss.lake_ss.lake_dir--- %s", ppss.lake_ss.lake_dir
)
logger.info("get_predictions_info_main start_timestr %s", start_timestr)
logger.info("get_predictions_info_main end_timestr %s", end_timestr)

table_name = NamedTable("pdr_predictions").fullname

# convert feed addresses to string for SQL query
feed_addrs_str = _address_list_to_str(feed_addrs)

query = f"""
SELECT *,
FROM {table_name}
WHERE
timestamp >= {UnixTimeMs.from_timestr(start_timestr)}
AND timestamp <= {UnixTimeMs.from_timestr(end_timestr)}
AND contract IN {feed_addrs_str}
"""

predictions_df = DuckDBDataStore(ppss.lake_ss.lake_dir).query_data(query)

assert len(predictions_df) > 0, "No records to summarize. Please adjust params."
_checks_for_empty_df(predictions_df, table_name)

feed_summary_df = get_feed_summary_stats(predictions_df)
logger.info(feed_summary_df)
Expand All @@ -83,30 +67,52 @@ def get_predictions_info_main(
def get_predictoors_info_main(
ppss: PPSS, start_timestr: str, end_timestr: str, pdr_addrs: List[str]
):
pred_filter = PredFilter(ppss)
pred_filter.filter_user_addrs(pdr_addrs)
pred_filter.filter_timestamp(start_timestr, end_timestr)
predictions_df = pred_filter.predictions_df
logger.info(
"get_predictoors_info_main_ppss.lake_ss.lake_dir--- %s", ppss.lake_ss.lake_dir
)
table_name = NamedTable("pdr_predictions").fullname

assert len(predictions_df) > 0, "No records to summarize. Please adjust params."
# convert feed addresses to string for SQL query
pdr_addrs_str = _address_list_to_str(pdr_addrs)

query = f"""
SELECT *,
FROM {table_name}
WHERE
timestamp >= {UnixTimeMs.from_timestr(start_timestr)}
AND timestamp <= {UnixTimeMs.from_timestr(end_timestr)}
AND user IN {pdr_addrs_str}
"""

predictions_df = DuckDBDataStore(ppss.lake_ss.lake_dir).query_data(query)

_checks_for_empty_df(predictions_df, table_name)

predictoor_summary_df = get_predictoor_summary_stats(predictions_df)
print(predictoor_summary_df)
logger.info(predictoor_summary_df)


@enforce_types
def get_traction_info_main(ppss: PPSS, start_timestr: str, end_timestr: str):
pred_filter = PredFilter(ppss)
pred_filter.filter_timestamp(start_timestr, end_timestr)
predictions_df = pred_filter.predictions_df
table_name = NamedTable("pdr_predictions").fullname

query = f"""
SELECT *,
FROM {table_name}
WHERE
timestamp >= {UnixTimeMs.from_timestr(start_timestr)}
AND timestamp <= {UnixTimeMs.from_timestr(end_timestr)}
"""

predictions_df = DuckDBDataStore(ppss.lake_ss.lake_dir).query_data(query)

assert len(predictions_df) > 0, "No records to summarize. Please adjust params."
_checks_for_empty_df(predictions_df, table_name)

# calculate predictoor traction statistics and draw plots
stats_df = get_traction_statistics(predictions_df)
plot_traction_cum_sum_statistics(stats_df, ppss.lake_ss.parquet_dir)
plot_traction_daily_statistics(stats_df, ppss.lake_ss.parquet_dir)
plot_traction_cum_sum_statistics(stats_df, ppss.lake_ss.lake_dir)
plot_traction_daily_statistics(stats_df, ppss.lake_ss.lake_dir)

# calculate slot statistics and draw plots
slots_df = get_slot_statistics(predictions_df)
plot_slot_daily_statistics(slots_df, ppss.lake_ss.parquet_dir)
plot_slot_daily_statistics(slots_df, ppss.lake_ss.lake_dir)
16 changes: 8 additions & 8 deletions pdr_backend/analytics/predictoor_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ class PredictoorStat(TypedDict):
def get_feed_summary_stats(predictions_df: pl.DataFrame) -> pl.DataFrame:
# 1 - filter from lake only the rows that you're looking for
df = predictions_df.filter(
~((pl.col("trueval").is_null()) | (pl.col("payout").is_null()))
~((pl.col("truevalue").is_null()) | (pl.col("payout").is_null()))
)

df = df.with_columns(
pl.col("prediction").eq(pl.col("trueval")).cast(pl.UInt8).alias("is_correct")
pl.col("predvalue").eq(pl.col("truevalue")).cast(pl.UInt8).alias("is_correct")
)
# Group by pair
df = df.group_by(["pair", "timeframe"]).agg(
pl.col("source").first().alias("source"),
pl.col("payout").sum().alias("sum_payout"),
pl.col("stake").sum().alias("sum_stake"),
pl.col("prediction").count().alias("num_predictions"),
(pl.col("is_correct").sum() / pl.col("pair").count() * 100).alias("accuracy"),
pl.col("predvalue").count().alias("num_predictions"),
(pl.col("predvalue").sum() / pl.col("pair").count() * 100).alias("accuracy"),
)

return df
Expand All @@ -56,19 +56,19 @@ def get_feed_summary_stats(predictions_df: pl.DataFrame) -> pl.DataFrame:
def get_predictoor_summary_stats(predictions_df: pl.DataFrame) -> pl.DataFrame:
# 1 - filter from lake only the rows that you're looking for
df = predictions_df.filter(
~((pl.col("trueval").is_null()) | (pl.col("payout").is_null()))
~((pl.col("truevalue").is_null()) | (pl.col("payout").is_null()))
)

df = df.with_columns(
pl.col("prediction").eq(pl.col("trueval")).cast(pl.UInt8).alias("is_correct")
pl.col("predvalue").eq(pl.col("truevalue")).cast(pl.UInt8).alias("is_correct")
)
# Group by pair
df = df.group_by(["user", "pair", "timeframe"]).agg(
pl.col("source").first().alias("source"),
pl.col("payout").sum().alias("sum_payout"),
pl.col("stake").sum().alias("sum_stake"),
pl.col("prediction").count().alias("num_predictions"),
(pl.col("is_correct").sum() / pl.col("pair").count() * 100).alias("accuracy"),
pl.col("predvalue").count().alias("num_predictions"),
(pl.col("predvalue").sum() / pl.col("pair").count() * 100).alias("accuracy"),
)

return df
Expand Down
23 changes: 5 additions & 18 deletions pdr_backend/analytics/test/conftest.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import pytest
import polars as pl
import pytest

from pdr_backend.subgraph.legacy.prediction import (
from pdr_backend.lake.plutil import _object_list_to_df
from pdr_backend.lake.prediction import (
mock_daily_predictions,
mock_first_predictions,
mock_second_predictions,
)

from pdr_backend.lake.legacy.plutil import _object_list_to_df
from pdr_backend.lake.table_pdr_predictions import predictions_schema


@pytest.fixture()
def _sample_first_predictions():
Expand All @@ -29,18 +27,7 @@ def _sample_daily_predictions():
@pytest.fixture()
def _gql_datafactory_first_predictions_df():
_predictions = mock_first_predictions()
predictions_df = _object_list_to_df(_predictions, predictions_schema)
predictions_df = predictions_df.with_columns(
[pl.col("timestamp").mul(1000).alias("timestamp")]
)

return predictions_df


@pytest.fixture()
def _gql_datafactory_second_predictions_df():
_predictions = mock_second_predictions()
predictions_df = _object_list_to_df(_predictions, predictions_schema)
predictions_df = _object_list_to_df(_predictions)
predictions_df = predictions_df.with_columns(
[pl.col("timestamp").mul(1000).alias("timestamp")]
)
Expand All @@ -51,7 +38,7 @@ def _gql_datafactory_second_predictions_df():
@pytest.fixture()
def _gql_datafactory_daily_predictions_df():
_predictions = mock_daily_predictions()
predictions_df = _object_list_to_df(_predictions, predictions_schema)
predictions_df = _object_list_to_df(_predictions)
predictions_df = predictions_df.with_columns(
[pl.col("timestamp").mul(1000).alias("timestamp")]
)
Expand Down
Loading

0 comments on commit 993ad88

Please sign in to comment.