Skip to content

Commit

Permalink
feat: make partner containers less verbose
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler committed Sep 7, 2023
1 parent bad13d3 commit 435327a
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 13 deletions.
2 changes: 1 addition & 1 deletion scripts/partners_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@

def main():
start = datetime.now()
process_partners(partners)
process_partners(partners, verbose=True)
logger.info(f"Partners summary completed in {datetime.now() - start}")
4 changes: 2 additions & 2 deletions scripts/utils/partners_cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

def main():
start = time()
clean_df = process_partners(partners,False)
clean_df = process_partners(partners, use_postgres_cache=False, verbose=True)
logger.info('clean df:')
logger.info(clean_df)
logger.info(f'took {time() - start}s')

start = time()
cached_df = process_partners(partners,True)
cached_df = process_partners(partners, use_postgres_cache=False, verbose=True)
logger.info('cached df:')
logger.info(cached_df)
logger.info(f'took {time() - start}s')
Expand Down
22 changes: 12 additions & 10 deletions yearn/partners/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from pandas.core.tools.datetimes import DatetimeScalar
from pony.orm import OperationalError, commit, db_session
from rich import print
from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio
from web3._utils.abi import filter_by_name
from web3._utils.events import construct_event_topic_set
from y import ERC20, Contract, get_price
Expand Down Expand Up @@ -78,11 +78,12 @@ def vault_contract(self) -> Contract:
def scale(self) -> int:
return self._vault.scale

async def get_data(self, partner: "Partner", use_postgres_cache: bool) -> DataFrame:
logger.info(f'starting to process {partner.name} {self} {self.name} {self.vault} {self.wrapper}')
async def get_data(self, partner: "Partner", use_postgres_cache: bool, verbose: bool = False) -> DataFrame:
# NOTE: We need to use a semaphore here to ensure at most one active coroutine is populating the db for this wrapper
async with self.semaphore:
if use_postgres_cache:
if verbose:
logger.info(f'starting {partner.name} {self} {self.name} {self.wrapper} for {self.vault}')
if use_postgres_cache:
cache = await threads.run(self.read_cache)
try:
max_cached_block = int(cache['block'].max())
Expand Down Expand Up @@ -441,16 +442,18 @@ async def flat_wrappers(self) -> List[Wrapper]:
flat_wrappers.extend(await wrapper.unwrap())
return flat_wrappers

async def process(self, use_postgres_cache: bool = USE_POSTGRES_CACHE) -> Tuple[DataFrame,DataFrame]:
async def process(self, use_postgres_cache: bool = USE_POSTGRES_CACHE, verbose: bool = False) -> Tuple[DataFrame,DataFrame]:
# TODO Optimize this a bit better.
# snapshot wrapper share at each harvest
wrappers = []
data = await asyncio.gather(*[wrapper.get_data(self, use_postgres_cache) for wrapper in await self.flat_wrappers])
gather = tqdm_asyncio.gather if verbose else asyncio.gather
data = await gather(*[wrapper.get_data(self, use_postgres_cache, verbose=verbose) for wrapper in await self.flat_wrappers])
for wrapper, data in zip(await self.flat_wrappers, data):
try:
data = data.set_index('block')
except KeyError:
logger.info('no fees for %s', wrapper.name)
if verbose:
logger.info('no fees for %s', wrapper.name)
continue

# TODO: save a csv for reporting
Expand Down Expand Up @@ -532,8 +535,7 @@ def export_payouts(self, partner: DataFrame) -> DataFrame:
payouts.to_csv(Path(f'research/partners/{self.name}/payouts_{Network.label()}.csv'), index=False)
return payouts


def process_partners(partners: List[Partner], use_postgres_cache: bool = USE_POSTGRES_CACHE) -> DataFrame:
def process_partners(partners: List[Partner], use_postgres_cache: bool = USE_POSTGRES_CACHE, verbose: bool = False) -> DataFrame:
if not partners:
raise UnsupportedNetwork(f'There are no partners on {Network.label()}')

Expand All @@ -545,7 +547,7 @@ def process_partners(partners: List[Partner], use_postgres_cache: bool = USE_POS
logger.warn('To enable caching without running the exporter, run `make postgres` from project root.')

partners_data: List[Tuple[DataFrame, DataFrame]] = asyncio.get_event_loop().run_until_complete(
asyncio.gather(*[partner.process(use_postgres_cache=use_postgres_cache) for partner in partners])
tqdm_asyncio.gather(*[partner.process(use_postgres_cache=use_postgres_cache, verbose=verbose) for partner in partners])
)
for partner, (result, payout) in zip(partners, partners_data):
if len(result) == len(payout) == 0:
Expand Down

0 comments on commit 435327a

Please sign in to comment.