Skip to content

Commit cce04c0

Browse files
author
Neil Booth
committed
Reorganize code in preparation
1 parent 6264ece commit cce04c0

File tree

1 file changed

+65
-56
lines changed

1 file changed

+65
-56
lines changed

electrumx/server/block_processor.py

Lines changed: 65 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -161,15 +161,25 @@ def __init__(self, env, db, daemon, notifications):
161161
self.daemon = daemon
162162
self.notifications = notifications
163163

164-
self.coin = env.coin
164+
# Set when there is block processing to do, e.g. when new blocks come in, or a
165+
# reorg is needed.
165166
self.blocks_event = asyncio.Event()
167+
168+
# If the lock is successfully acquired, in-memory chain state
169+
# is consistent with self.height
170+
self.state_lock = asyncio.Lock()
171+
172+
# Signalled after backing up during a reorg
173+
self.backed_up_event = asyncio.Event()
174+
175+
self.coin = env.coin
166176
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event)
167177
self.logger = class_logger(__name__, self.__class__.__name__)
168178

169179
# Meta
170180
self.next_cache_check = 0
171181
self.touched = set()
172-
self.reorg_count = 0
182+
self.reorg_count = None
173183
self.height = -1
174184
self.tip = None
175185
self.tx_count = 0
@@ -184,63 +194,27 @@ def __init__(self, env, db, daemon, notifications):
184194
self.utxo_cache = {}
185195
self.db_deletes = []
186196

187-
# If the lock is successfully acquired, in-memory chain state
188-
# is consistent with self.height
189-
self.state_lock = asyncio.Lock()
190-
191-
# Signalled after backing up during a reorg
192-
self.backed_up_event = asyncio.Event()
193-
194197
async def run_with_lock(self, coro):
195-
# Shielded so that cancellations from shutdown don't lose work - when the task
196-
# completes the data will be flushed and then we shut down. Take the state lock
197-
# to be certain in-memory state is consistent and not being updated elsewhere.
198+
# Shielded so that cancellations from shutdown don't lose work. Cancellation will
199+
# cause fetch_and_process_blocks to block on the lock in flush(), the task completes,
200+
# and then the data is flushed. We also don't want user-signalled reorgs to happen
201+
# in the middle of processing blocks; they need to wait.
198202
async def run_locked():
199203
async with self.state_lock:
200204
return await coro
201205
return await asyncio.shield(run_locked())
202206

203-
async def check_and_advance_blocks(self, raw_blocks):
204-
'''Process the list of raw blocks passed. Detects and handles
205-
reorgs.
206-
'''
207-
if not raw_blocks:
208-
return
209-
blocks = [self.coin.block(raw_block) for raw_block in raw_blocks]
210-
headers = [block.header for block in blocks]
211-
hprevs = [self.coin.header_prevhash(h) for h in headers]
212-
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
213-
214-
if hprevs == chain:
215-
start = time.monotonic()
216-
await self.run_with_lock(self.advance_blocks(blocks))
217-
await self._maybe_flush()
218-
if not self.db.first_sync:
219-
s = '' if len(blocks) == 1 else 's'
220-
blocks_size = sum(len(block) for block in raw_blocks) / 1_000_000
221-
self.logger.info(f'processed {len(blocks):,d} block{s} size {blocks_size:.2f} MB '
222-
f'in {time.monotonic() - start:.1f}s')
223-
if self._caught_up_event.is_set():
224-
await self.notifications.on_block(self.touched, self.height)
225-
self.touched = set()
226-
elif hprevs[0] != chain[0]:
227-
await self.reorg_chain()
228-
else:
229-
# It is probably possible but extremely rare that what
230-
# bitcoind returns doesn't form a chain because it
231-
# reorg-ed the chain as it was processing the batched
232-
# block hash requests. Should this happen it's simplest
233-
# just to reset the prefetcher and try again.
234-
self.logger.warning('daemon blocks do not form a chain; '
235-
'resetting the prefetcher')
236-
await self.prefetcher.reset_height(self.height)
207+
def schedule_reorg(self, count):
208+
'''A count >= 0 is a user-forced reorg; < 0 is a natural reorg.'''
209+
self.reorg_count = count
210+
self.blocks_event.set()
237211

238-
async def reorg_chain(self, count=None):
212+
async def reorg_chain(self, count):
239213
'''Handle a chain reorganisation.
240214
241215
Count is the number of blocks to simulate a reorg, or None for
242216
a real reorg.'''
243-
if count is None:
217+
if count < 0:
244218
self.logger.info('chain reorg detected')
245219
else:
246220
self.logger.info(f'faking a reorg of {count:,d} blocks')
@@ -299,7 +273,7 @@ def diff_pos(hashes1, hashes2):
299273
return n
300274
return len(hashes)
301275

302-
if count is None:
276+
if count < 0:
303277
# A real reorg
304278
start = self.height - 1
305279
count = 1
@@ -381,6 +355,41 @@ def check_cache_size(self):
381355
return utxo_MB >= cache_MB * 4 // 5
382356
return None
383357

358+
async def check_and_advance_blocks(self, raw_blocks):
359+
'''Process the list of raw blocks passed. Detects and handles
360+
reorgs.
361+
'''
362+
if not raw_blocks:
363+
return
364+
blocks = [self.coin.block(raw_block) for raw_block in raw_blocks]
365+
headers = [block.header for block in blocks]
366+
hprevs = [self.coin.header_prevhash(h) for h in headers]
367+
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
368+
369+
if hprevs == chain:
370+
start = time.monotonic()
371+
await self.run_with_lock(self.advance_blocks(blocks))
372+
await self._maybe_flush()
373+
if not self.db.first_sync:
374+
s = '' if len(blocks) == 1 else 's'
375+
blocks_size = sum(len(block) for block in raw_blocks) / 1_000_000
376+
self.logger.info(f'processed {len(blocks):,d} block{s} size {blocks_size:.2f} MB '
377+
f'in {time.monotonic() - start:.1f}s')
378+
if self._caught_up_event.is_set():
379+
await self.notifications.on_block(self.touched, self.height)
380+
self.touched = set()
381+
elif hprevs[0] != chain[0]:
382+
self.schedule_reorg(-1)
383+
else:
384+
# It is probably possible but extremely rare that what
385+
# bitcoind returns doesn't form a chain because it
386+
# reorg-ed the chain as it was processing the batched
387+
# block hash requests. Should this happen it's simplest
388+
# just to reset the prefetcher and try again.
389+
self.logger.warning('daemon blocks do not form a chain; '
390+
'resetting the prefetcher')
391+
await self.prefetcher.reset_height(self.height)
392+
384393
async def advance_blocks(self, blocks):
385394
'''Advance the blocks.
386395
@@ -590,7 +599,7 @@ def spend_utxo(self, tx_hash, tx_idx):
590599
# Fast track is it being in the cache
591600
idx_packed = pack_le_uint32(tx_idx)
592601
cache_value = self.utxo_cache.pop(tx_hash + idx_packed, None)
593-
if cache_value:
602+
if cache_value:
594603
return cache_value
595604

596605
# Spend it from the DB.
@@ -631,11 +640,13 @@ async def _process_prefetched_blocks(self):
631640
if not self._caught_up_event.is_set():
632641
await self._first_caught_up()
633642
self._caught_up_event.set()
643+
634644
await self.blocks_event.wait()
635645
self.blocks_event.clear()
636-
if self.reorg_count:
646+
647+
if self.reorg_count is not None:
637648
await self.reorg_chain(self.reorg_count)
638-
self.reorg_count = 0
649+
self.reorg_count = None
639650
else:
640651
blocks = self.prefetcher.get_prefetched_blocks()
641652
await self.check_and_advance_blocks(blocks)
@@ -647,8 +658,7 @@ async def _first_caught_up(self):
647658
self.db.first_sync = False
648659
await self.flush(True)
649660
if first_sync:
650-
self.logger.info(f'{electrumx.version} synced to '
651-
f'height {self.height:,d}')
661+
self.logger.info(f'{electrumx.version} synced to height {self.height:,d}')
652662
# Reopen for serving
653663
await self.db.open_for_serving()
654664

@@ -690,7 +700,6 @@ def force_chain_reorg(self, count):
690700
Returns True if a reorg is queued, false if not caught up.
691701
'''
692702
if self._caught_up_event.is_set():
693-
self.reorg_count = count
694-
self.blocks_event.set()
703+
self.schedule_reorg(count)
695704
return True
696705
return False

0 commit comments

Comments
 (0)