Skip to content

Commit

Permalink
temp: debugging semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler committed Aug 1, 2023
1 parent 8c1b205 commit 33c2d5a
Showing 1 changed file with 28 additions and 26 deletions.
54 changes: 28 additions & 26 deletions yearn/partners/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ async def unwrap(self) -> List[Wrapper]:
]

mem_semaphore = asyncio.Semaphore(5) # Some arbitrary number, we can play with this if ram use still too high
debugging_semaphore = asyncio.Semaphore(1)

@dataclass
class Partner:
Expand Down Expand Up @@ -371,33 +372,34 @@ async def process(self, use_postgres_cache: bool = USE_POSTGRES_CACHE) -> Tuple[
# snapshot wrapper share at each harvest
wrappers = []
for wrapper in tqdm(await self.flat_wrappers, self.name):
# NOTE: We need to use a semaphore here to ensure at most one active coroutine is populating the db for this wrapper
async with wrapper.semaphore:
# NOTE: We then use this additional semaphore for memory conservation purposes.
# We use a separate semaphore because the rationalle for each is different and we don't want to accidentally refactor it away.
#async with mem_semaphore:
if use_postgres_cache:
cache = await threads.run(wrapper.read_cache)
try:
max_cached_block = int(cache['block'].max())
logger.debug('%s %s is cached thru block %s', self.name, wrapper.name, max_cached_block)
start_block = max_cached_block + 1 if self.start_block is None else max(self.start_block, max_cached_block + 1)
except KeyError:
async with debugging_semaphore: # NOTE: debugging purposes only, delete later
# NOTE: We need to use a semaphore here to ensure at most one active coroutine is populating the db for this wrapper
async with wrapper.semaphore:
# NOTE: We then use this additional semaphore for memory conservation purposes.
# We use a separate semaphore because the rationalle for each is different and we don't want to accidentally refactor it away.
#async with mem_semaphore:
if use_postgres_cache:
cache = await threads.run(wrapper.read_cache)
try:
max_cached_block = int(cache['block'].max())
logger.debug('%s %s is cached thru block %s', self.name, wrapper.name, max_cached_block)
start_block = max_cached_block + 1 if self.start_block is None else max(self.start_block, max_cached_block + 1)
except KeyError:
start_block = self.start_block
logger.debug('no harvests cached for %s %s', self.name, wrapper.name)
logger.debug(f'start block: {start_block}')
else:
start_block = self.start_block
logger.debug('no harvests cached for %s %s', self.name, wrapper.name)
logger.debug(f'start block: {start_block}')
else:
start_block = self.start_block

futs = []
async for protocol_fees in wrapper.protocol_fees(start_block=start_block):
futs.append(asyncio.create_task(process_harvests(wrapper, protocol_fees)))
data = [data for data in await asyncio.gather(*futs) if data is not None]
wrap = pd.concat(data) if data else DataFrame()

if use_postgres_cache:
await threads.run(cache_data, wrap)
wrap = pd.concat([cache, wrap])

futs = []
async for protocol_fees in wrapper.protocol_fees(start_block=start_block):
futs.append(asyncio.create_task(process_harvests(wrapper, protocol_fees)))
data = [data for data in await asyncio.gather(*futs) if data is not None]
wrap = pd.concat(data) if data else DataFrame()

if use_postgres_cache:
await threads.run(cache_data, wrap)
wrap = pd.concat([cache, wrap])

try:
wrap = wrap.set_index('block')
Expand Down

0 comments on commit 33c2d5a

Please sign in to comment.