-
Notifications
You must be signed in to change notification settings - Fork 25
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Lake] integrate pdr_subscriptions into GQL Data Factory (#469)
* first commit for subscriptions * hook up pdr_subscriptions to gql_factory * Tests passing, expanding tests to support multiple tables * Adding tests and improving handling of empty parquet files * Subscriptions test * Updating logic to use predictSubscriptions, take lastPriceValue, and to not query the subgraph more than needed. * Moving models from contract/ -> subgraph/ * Fixing pylint * fixing tests * adding @enforce_types
- Loading branch information
1 parent
bd01d71
commit a0244c9
Showing
18 changed files
with
813 additions
and
36 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
from typing import Dict | ||
|
||
import polars as pl | ||
from enforce_typing import enforce_types | ||
from polars import Int64, Utf8, Float32 | ||
|
||
from pdr_backend.subgraph.subgraph_subscriptions import ( | ||
fetch_filtered_subscriptions, | ||
) | ||
from pdr_backend.lake.table_pdr_predictions import _transform_timestamp_to_ms | ||
from pdr_backend.lake.plutil import _object_list_to_df | ||
from pdr_backend.util.networkutil import get_sapphire_postfix | ||
from pdr_backend.util.timeutil import ms_to_seconds | ||
|
||
|
||
# RAW PREDICTOOR SUBSCRIPTIONS SCHEMA | ||
subscriptions_schema = { | ||
"ID": Utf8, | ||
"pair": Utf8, | ||
"timeframe": Utf8, | ||
"source": Utf8, | ||
"tx_id": Utf8, | ||
"last_price_value": Float32, | ||
"timestamp": Int64, | ||
"user": Utf8, | ||
} | ||
|
||
|
||
@enforce_types | ||
def get_pdr_subscriptions_df( | ||
network: str, st_ut: int, fin_ut: int, config: Dict | ||
) -> pl.DataFrame: | ||
""" | ||
@description | ||
Fetch raw subscription events from predictoor subgraph | ||
Update function for graphql query, returns raw data | ||
+ Transforms ts into ms as required for data factory | ||
""" | ||
network = get_sapphire_postfix(network) | ||
|
||
# fetch subscriptions | ||
subscriptions = fetch_filtered_subscriptions( | ||
ms_to_seconds(st_ut), ms_to_seconds(fin_ut), config["contract_list"], network | ||
) | ||
|
||
if len(subscriptions) == 0: | ||
print(" No subscriptions fetched. Exit.") | ||
return pl.DataFrame() | ||
|
||
# convert subscriptions to df and transform timestamp into ms | ||
subscriptions_df = _object_list_to_df(subscriptions, subscriptions_schema) | ||
subscriptions_df = _transform_timestamp_to_ms(subscriptions_df) | ||
|
||
# cull any records outside of our time range and sort them by timestamp | ||
subscriptions_df = subscriptions_df.filter( | ||
pl.col("timestamp").is_between(st_ut, fin_ut) | ||
).sort("timestamp") | ||
|
||
return subscriptions_df |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,14 @@ | ||
import pytest | ||
|
||
from pdr_backend.contract.prediction import mock_daily_predictions | ||
from pdr_backend.subgraph.prediction import mock_daily_predictions | ||
from pdr_backend.subgraph.subscription import mock_subscriptions | ||
|
||
|
||
@pytest.fixture() | ||
def sample_daily_predictions(): | ||
return mock_daily_predictions() | ||
|
||
|
||
@pytest.fixture() | ||
def sample_subscriptions(): | ||
return mock_subscriptions() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.