Skip to content

Commit

Permalink
Towards #494: Improve coverage 2 (#498)
Browse files Browse the repository at this point in the history
* Adds some coverage to dfbuyer agent.
* Add dfbuyer and ppss coverage.
* Adds predictoor and sim coverage.
* Adds coverage to util.
* Add some trueval coverage.
* Adds coverage to trader agents.
* Add coverage to portfolio.
* Add coverage to subgraph consume_so_far and fix an infinite loop bug.
* More subgraph coverage.
  • Loading branch information
calina-c authored Jan 11, 2024
1 parent 4959b5d commit 474ec9c
Show file tree
Hide file tree
Showing 40 changed files with 608 additions and 113 deletions.
75 changes: 42 additions & 33 deletions pdr_backend/dfbuyer/dfbuyer_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ def __init__(self, ppss: PPSS):
def run(self, testing: bool = False):
if not self.feeds:
return

while True:
ts = self.ppss.web3_pp.web3_config.get_block("latest")["timestamp"]
ts = self.ppss.web3_pp.web3_config.get_current_timestamp()
self.take_step(ts)

if testing:
Expand All @@ -75,6 +76,7 @@ def run(self, testing: bool = False):
def take_step(self, ts: int):
if not self.feeds:
return

print("Taking step for timestamp:", ts)
wait_until_subgraph_syncs(
self.ppss.web3_pp.web3_config, self.ppss.web3_pp.subgraph_url
Expand Down Expand Up @@ -107,17 +109,18 @@ def take_step(self, ts: int):
print("Sleeping for a minute and trying again")
time.sleep(60)
return

self.fail_counter = 0
self._sleep_until_next_consume_interval()

def _sleep_until_next_consume_interval(self):
# sleep until next consume interval
ts = self.ppss.web3_pp.web3_config.get_block("latest")["timestamp"]
interval_start = (
int(ts / self.ppss.dfbuyer_ss.consume_interval_seconds)
* self.ppss.dfbuyer_ss.consume_interval_seconds
)
seconds_left = (
(interval_start + self.ppss.dfbuyer_ss.consume_interval_seconds) - ts + 60
)
ts = self.ppss.web3_pp.web3_config.get_current_timestamp()
consume_interval_seconds = self.ppss.dfbuyer_ss.consume_interval_seconds

interval_start = int(ts / consume_interval_seconds) * consume_interval_seconds
seconds_left = (interval_start + consume_interval_seconds) - ts + 60

print(
f"-- Sleeping for {seconds_left} seconds until next consume interval... --"
)
Expand All @@ -135,14 +138,11 @@ def _get_missing_consumes(self, ts: int) -> Dict[str, float]:
actual_consumes = self._get_consume_so_far(ts)
expected_consume_per_feed = self._get_expected_amount_per_feed(ts)

missing_consumes_amt: Dict[str, float] = {}

for address in self.feeds:
missing = expected_consume_per_feed - actual_consumes[address]
if missing > 0:
missing_consumes_amt[address] = missing

return missing_consumes_amt
return {
address: expected_consume_per_feed - actual_consumes[address]
for address in self.feeds
if expected_consume_per_feed > actual_consumes[address]
}

def _prepare_batches(
self, consume_times: Dict[str, int]
Expand All @@ -153,6 +153,7 @@ def _prepare_batches(
batches: List[Tuple[List[str], List[int]]] = []
addresses_to_consume: List[str] = []
times_to_consume: List[int] = []

for address, times in consume_times.items():
while times > 0:
current_times_to_consume = min(
Expand All @@ -172,6 +173,7 @@ def _prepare_batches(
batches.append((addresses_to_consume, times_to_consume))
addresses_to_consume = []
times_to_consume = []

return batches

def _consume(self, addresses_to_consume, times_to_consume):
Expand All @@ -184,17 +186,21 @@ def _consume(self, addresses_to_consume, times_to_consume):
True,
)
tx_hash = tx["transactionHash"].hex()

if tx["status"] != 1:
print(f" Tx reverted: {tx_hash}")
return False

print(f" Tx sent: {tx_hash}")
return True
except Exception as e:
print(f" Attempt {i+1} failed with error: {e}")
time.sleep(1)

if i == 4:
print(" Failed to consume contracts after 5 attempts.")
raise

return False

def _consume_batch(self, addresses_to_consume, times_to_consume) -> bool:
Expand All @@ -213,13 +219,16 @@ def _consume_batch(self, addresses_to_consume, times_to_consume) -> bool:
for address, times in zip(addresses_to_consume, times_to_consume):
if self._consume([address], [times]):
continue # If successful, continue to the next address

# If individual consumption fails, split the consumption into two parts
half_time = times // 2

if half_time > 0:
print(f" Consuming {address} for {half_time} times")
if not self._consume([address], [half_time]):
print("Transaction reverted again, please adjust batch size")
one_or_more_failed = True

remaining_times = times - half_time
if remaining_times > 0:
print(f" Consuming {address} for {remaining_times} times")
Expand All @@ -229,24 +238,27 @@ def _consume_batch(self, addresses_to_consume, times_to_consume) -> bool:
else:
print(f" Unable to consume {address} for {times} times")
one_or_more_failed = True

return one_or_more_failed

def _batch_txs(self, consume_times: Dict[str, int]) -> bool:
batches = self._prepare_batches(consume_times)
print(f"Processing {len(batches)} batches...")
one_or_more_failed = False

failures = 0

for addresses_to_consume, times_to_consume in batches:
failed = self._consume_batch(addresses_to_consume, times_to_consume)
if failed:
one_or_more_failed = True
return one_or_more_failed
failures += int(self._consume_batch(addresses_to_consume, times_to_consume))

return bool(failures)

def _get_prices(self, contract_addresses: List[str]) -> Dict[str, float]:
prices: Dict[str, float] = {}
for address in contract_addresses:
rate_wei = PredictoorContract(self.ppss.web3_pp, address).get_price()
prices[address] = from_wei(rate_wei)
return prices
return {
address: from_wei(
PredictoorContract(self.ppss.web3_pp, address).get_price()
)
for address in contract_addresses
}

def _get_consume_so_far(self, ts: int) -> Dict[str, float]:
week_start = (math.floor(ts / WEEK)) * WEEK
Expand All @@ -259,15 +271,12 @@ def _get_consume_so_far(self, ts: int) -> Dict[str, float]:
return consume_so_far

def _get_expected_amount_per_feed(self, ts: int):
amount_per_feed_per_interval = self.ppss.dfbuyer_ss.amount_per_interval / len(
self.feeds
)
ss = self.ppss.dfbuyer_ss
amount_per_feed_per_interval = ss.amount_per_interval / len(self.feeds)
week_start = (math.floor(ts / WEEK)) * WEEK
time_passed = ts - week_start

# find out how many intervals has passed
n_intervals = (
int(time_passed / self.ppss.dfbuyer_ss.consume_interval_seconds) + 1
)
n_intervals = int(time_passed / ss.consume_interval_seconds) + 1

return n_intervals * amount_per_feed_per_interval
43 changes: 43 additions & 0 deletions pdr_backend/dfbuyer/test/test_dfbuyer_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ def test_dfbuyer_agent_constructor( # pylint: disable=unused-argument
)


@enforce_types
def test_dfbuyer_agent_constructor_empty():
# test with no feeds
mock_ppss_empty = MagicMock(spec=PPSS)
mock_ppss_empty.dfbuyer_ss = MagicMock(spec=DFBuyerSS)
mock_ppss_empty.dfbuyer_ss.filter_feeds_from_candidates.return_value = {}
mock_ppss_empty.web3_pp = MagicMock(spec=Web3PP)
mock_ppss_empty.web3_pp.query_feed_contracts.return_value = {}

with pytest.raises(ValueError, match="No feeds found"):
DFBuyerAgent(mock_ppss_empty)


@enforce_types
def test_dfbuyer_agent_get_expected_amount_per_feed(mock_dfbuyer_agent):
ts = 1695211135
Expand Down Expand Up @@ -207,6 +220,10 @@ def test_dfbuyer_agent_take_step(
mock_get_block.assert_called_once_with("latest")
mock_sleep.assert_called_once_with(86400 - 60)

# empty feeds
mock_dfbuyer_agent.feeds = []
assert mock_dfbuyer_agent.take_step(ts) is None


@enforce_types
@patch.object(DFBuyerAgent, "take_step")
Expand All @@ -217,6 +234,10 @@ def test_dfbuyer_agent_run(mock_get_block, mock_take_step, mock_dfbuyer_agent):
mock_get_block.assert_called_once_with("latest")
mock_take_step.assert_called_once_with(mock_get_block.return_value["timestamp"])

# empty feeds
mock_dfbuyer_agent.feeds = []
assert mock_dfbuyer_agent.run(testing=True) is None


@enforce_types
@patch(f"{PATH}.time.sleep", return_value=None)
Expand Down Expand Up @@ -263,3 +284,25 @@ def test_dfbuyer_agent_consume_batch_method(mock_dfbuyer_agent):
),
]
mock_consume.assert_has_calls(calls)


@enforce_types
def test_dfbuyer_agent_batch_txs(mock_dfbuyer_agent):
addresses = [ZERO_ADDRESS[: -len(str(i))] + str(i) for i in range(1, 7)]
consume_times = dict(zip(addresses, [10, 30, 14, 6, 24, 16]))

with patch.object(
mock_dfbuyer_agent,
"_consume_batch",
side_effect=[False, True, False, True, True],
):
failures = mock_dfbuyer_agent._batch_txs(consume_times)

assert failures

with patch.object(
mock_dfbuyer_agent, "_consume_batch", side_effect=[True, True, True, True, True]
):
failures = mock_dfbuyer_agent._batch_txs(consume_times)

assert failures
4 changes: 2 additions & 2 deletions pdr_backend/ppss/sim_ss.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def __init__(self, d: dict):
print(f"Could not find log dir, creating one at: {self.log_dir}")
os.makedirs(self.log_dir)

if not (0 < self.test_n < np.inf): # pylint: disable=superfluous-parens
raise ValueError(f"test_n={self.test_n}, must be >0 and <inf")
if not (0 < int(self.test_n) < np.inf): # pylint: disable=superfluous-parens
raise ValueError(f"test_n={self.test_n}, must be an int >0 and <inf")

# --------------------------------
# properties direct from yaml dict
Expand Down
1 change: 1 addition & 0 deletions pdr_backend/ppss/test/test_aimodel_ss.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def test_aimodel_ss_happy1():
"input_feeds": ["kraken ETH/USDT hc", "binanceus ETH/USDT,TRX/DAI h"],
}
ss = AimodelSS(d)
assert isinstance(ss.copy(), AimodelSS)

# yaml properties
assert ss.feeds_strs == ["kraken ETH/USDT hc", "binanceus ETH/USDT,TRX/DAI h"]
Expand Down
2 changes: 2 additions & 0 deletions pdr_backend/ppss/test/test_lake_ss.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def test_lake_ss_basic():
# test str
assert "LakeSS" in str(ss)

assert isinstance(ss.copy(), LakeSS)


@enforce_types
def test_lake_ss_now():
Expand Down
16 changes: 16 additions & 0 deletions pdr_backend/ppss/test/test_sim_ss.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import os
import pytest

from enforce_typing import enforce_types

Expand All @@ -21,6 +22,21 @@ def test_sim_ss():
assert "SimSS" in str(ss)


@enforce_types
def test_sim_ss_bad():
bad = copy.deepcopy(_D)
bad["test_n"] = -3

with pytest.raises(ValueError):
SimSS(bad)

bad = copy.deepcopy(_D)
bad["test_n"] = "lit"

with pytest.raises(ValueError):
SimSS(bad)


@enforce_types
def test_log_dir(tmpdir):
# rel path given; needs an abs path
Expand Down
2 changes: 2 additions & 0 deletions pdr_backend/ppss/test/test_web3_pp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest
from enforce_typing import enforce_types
from eth_account.signers.local import LocalAccount
from web3 import Web3

from pdr_backend.contract.predictoor_contract import mock_predictoor_contract
Expand Down Expand Up @@ -57,6 +58,7 @@ def test_web3_pp__yaml_dict(monkeypatch):
assert pp.rpc_url == "rpc url 1"
assert pp.subgraph_url == "subgraph url 1"
assert pp.owner_addrs == "0xOwner1"
assert isinstance(pp.account, LocalAccount)

# network2
pp2 = Web3PP(_D, "network2")
Expand Down
29 changes: 29 additions & 0 deletions pdr_backend/predictoor/approach1/test/test_predictoor_agent1.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,35 @@
from unittest.mock import MagicMock

import pytest
from enforce_typing import enforce_types

from pdr_backend.ppss.ppss import PPSS
from pdr_backend.ppss.predictoor_ss import PredictoorSS
from pdr_backend.ppss.web3_pp import Web3PP
from pdr_backend.predictoor.approach1.predictoor_agent1 import PredictoorAgent1
from pdr_backend.predictoor.test.predictoor_agent_runner import run_agent_test


def test_predictoor_agent1(tmpdir, monkeypatch):
run_agent_test(str(tmpdir), monkeypatch, PredictoorAgent1)


def test_run():
mock_predictoor_agent1 = MagicMock(spec=PredictoorAgent1)
take_step = mock_predictoor_agent1.take_step
take_step.return_value = None

mock_predictoor_agent1.run()


@enforce_types
def test_agent_constructor_empty():
# test with no feeds
mock_ppss_empty = MagicMock(spec=PPSS)
mock_ppss_empty.predictoor_ss = MagicMock(spec=PredictoorSS)
mock_ppss_empty.predictoor_ss.get_feed_from_candidates.return_value = None
mock_ppss_empty.web3_pp = MagicMock(spec=Web3PP)
mock_ppss_empty.web3_pp.query_feed_contracts.return_value = {}

with pytest.raises(ValueError, match="No feeds found"):
PredictoorAgent1(mock_ppss_empty)
2 changes: 1 addition & 1 deletion pdr_backend/predictoor/base_predictoor_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ def __init__(self, ppss: PPSS):
print_feeds(cand_feeds, f"cand feeds, owner={ppss.web3_pp.owner_addrs}")

feed = ppss.predictoor_ss.get_feed_from_candidates(cand_feeds)
print_feeds({feed.address: feed}, "filtered feeds")
if not feed:
raise ValueError("No feeds found.")

print_feeds({feed.address: feed}, "filtered feed")
self.feed = feed

contracts = ppss.web3_pp.get_contracts([feed.address])
Expand Down
2 changes: 1 addition & 1 deletion pdr_backend/sim/test/test_sim_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_sim_engine(tmpdir):
assert hasattr(ppss, "sim_ss")
ppss.sim_ss = SimSS(
{
"do_plot": False,
"do_plot": True,
"log_dir": os.path.join(tmpdir, "logs"),
"test_n": 10,
}
Expand Down
3 changes: 2 additions & 1 deletion pdr_backend/subgraph/subgraph_consume_so_far.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def get_consume_so_far_per_contract(
query = """
{
predictContracts(first:1000, where: {id_in: %s}){
id
id
token{
id
name
Expand Down Expand Up @@ -66,6 +66,7 @@ def get_consume_so_far_per_contract(
for contract in contracts:
contract_address = contract["id"]
if contract_address not in contract_addresses:
no_of_zeroes += 1
continue
order_count = len(contract["token"]["orders"])
if order_count == 0:
Expand Down
Loading

0 comments on commit 474ec9c

Please sign in to comment.