Skip to content

Commit 4ca0a04

Browse files
author
Neil Booth
committed
Backup one block at a time.
1 parent 48db78b commit 4ca0a04

File tree

1 file changed

+43
-44
lines changed

1 file changed

+43
-44
lines changed

electrumx/server/block_processor.py

Lines changed: 43 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from electrumx.lib.hash import hash_to_hex_str, HASHX_LEN
2121
from electrumx.lib.script import is_unspendable_legacy, is_unspendable_genesis
2222
from electrumx.lib.util import (
23-
chunks, class_logger, pack_le_uint32, pack_le_uint64, unpack_le_uint64
23+
class_logger, pack_le_uint32, pack_le_uint64, unpack_le_uint64
2424
)
2525
from electrumx.server.db import FlushData
2626

@@ -209,7 +209,7 @@ def schedule_reorg(self, count):
209209
self.reorg_count = count
210210
self.blocks_event.set()
211211

212-
async def reorg_chain(self, count):
212+
async def _reorg_chain(self, count):
213213
'''Handle a chain reorganisation.
214214
215215
Count is the number of blocks to simulate a reorg, or None for
@@ -220,46 +220,47 @@ async def reorg_chain(self, count):
220220
self.logger.info(f'faking a reorg of {count:,d} blocks')
221221
await self.flush(True)
222222

223-
async def get_raw_blocks(last_height, hex_hashes):
224-
heights = range(last_height, last_height - len(hex_hashes), -1)
223+
async def get_raw_block(hex_hash, height):
225224
try:
226-
blocks = [self.db.read_raw_block(height) for height in heights]
227-
self.logger.info(f'read {len(blocks)} blocks from disk')
228-
return blocks
225+
block = self.db.read_raw_block(height)
226+
self.logger.info(f'read block {hex_hash} at height {height:,d} from disk')
229227
except FileNotFoundError:
230-
return await self.daemon.raw_blocks(hex_hashes)
231-
232-
_start, last, hashes = await self.reorg_hashes(count)
233-
# Reverse and convert to hex strings.
234-
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
235-
for hex_hashes in chunks(hashes, 50):
236-
raw_blocks = await get_raw_blocks(last, hex_hashes)
237-
await self.backup_blocks(raw_blocks)
228+
block = await self.daemon.raw_blocks([hex_hash])[0]
229+
self.logger.info(f'obtained block {hex_hash} at height {height:,d} from daemon')
230+
return block
231+
232+
_start, height, hashes = await self._reorg_hashes(count)
233+
hex_hashes = [hash_to_hex_str(block_hash) for block_hash in hashes]
234+
for hex_hash in reversed(hex_hashes):
235+
raw_block = await get_raw_block(hex_hash, height)
236+
await self._backup_block(raw_block)
238237
# self.touched can include other addresses which is harmless, but remove None.
239238
self.touched.discard(None)
240239
self.db.flush_backup(self.flush_data(), self.touched)
241-
last -= len(raw_blocks)
240+
height -= 1
241+
242+
self.logger.info('backed up to height {:,d}'.format(self.height))
242243

243244
await self.prefetcher.reset_height(self.height)
244245
self.backed_up_event.set()
245246
self.backed_up_event.clear()
246247

247-
async def reorg_hashes(self, count):
248+
async def _reorg_hashes(self, count):
248249
'''Return a pair (start, last, hashes) of blocks to back up during a
249250
reorg.
250251
251252
The hashes are returned in order of increasing height. Start
252253
is the height of the first hash, last of the last.
253254
'''
254-
start, count = await self.calc_reorg_range(count)
255+
start, count = await self._calc_reorg_range(count)
255256
last = start + count - 1
256257
s = '' if count == 1 else 's'
257258
self.logger.info(f'chain was reorganised replacing {count:,d} '
258259
f'block{s} at heights {start:,d}-{last:,d}')
259260

260261
return start, last, await self.db.fs_block_hashes(start, count)
261262

262-
async def calc_reorg_range(self, count):
263+
async def _calc_reorg_range(self, count):
263264
'''Calculate the reorg range'''
264265

265266
def diff_pos(hashes1, hashes2):
@@ -441,38 +442,36 @@ def advance_txs(self, txs, is_unspendable):
441442

442443
return undo_info
443444

444-
async def backup_blocks(self, raw_blocks):
445-
'''Backup the raw blocks and flush.
445+
async def _backup_block(self, raw_block):
446+
'''Backup the raw block and flush.
446447
447-
The blocks should be in order of decreasing height, starting at.
448-
self.height. A flush is performed once the blocks are backed up.
448+
The blocks should be in order of decreasing height, starting at. self.height. A
449+
flush is performed once the blocks are backed up.
449450
'''
450451
self.db.assert_flushed(self.flush_data())
451-
assert self.height >= len(raw_blocks)
452+
assert self.height > 0
452453
genesis_activation = self.coin.GENESIS_ACTIVATION
453454

454455
coin = self.coin
455-
for raw_block in raw_blocks:
456-
# Check and update self.tip
457-
block = coin.block(raw_block)
458-
header_hash = coin.header_hash(block.header)
459-
if header_hash != self.tip:
460-
raise ChainError('backup block {} not tip {} at height {:,d}'
461-
.format(hash_to_hex_str(header_hash),
462-
hash_to_hex_str(self.tip),
463-
self.height))
464-
self.tip = coin.header_prevhash(block.header)
465-
is_unspendable = (is_unspendable_genesis if self.height >= genesis_activation
466-
else is_unspendable_legacy)
467-
self.backup_txs(block.transactions, is_unspendable)
468-
self.height -= 1
469-
self.db.tx_counts.pop()
470-
471-
await sleep(0)
472456

473-
self.logger.info('backed up to height {:,d}'.format(self.height))
457+
# Check and update self.tip
458+
block = coin.block(raw_block)
459+
header_hash = coin.header_hash(block.header)
460+
if header_hash != self.tip:
461+
raise ChainError('backup block {} not tip {} at height {:,d}'
462+
.format(hash_to_hex_str(header_hash),
463+
hash_to_hex_str(self.tip),
464+
self.height))
465+
self.tip = coin.header_prevhash(block.header)
466+
is_unspendable = (is_unspendable_genesis if self.height >= genesis_activation
467+
else is_unspendable_legacy)
468+
self._backup_txs(block.transactions, is_unspendable)
469+
self.height -= 1
470+
self.db.tx_counts.pop()
471+
472+
await sleep(0)
474473

475-
def backup_txs(self, txs, is_unspendable):
474+
def _backup_txs(self, txs, is_unspendable):
476475
# Prevout values, in order down the block (coinbase first if present)
477476
# undo_info is in reverse block order
478477
undo_info = self.db.read_undo_info(self.height)
@@ -612,7 +611,7 @@ async def _process_blocks(self):
612611
async def process_event():
613612
'''Perform any pending reorg, the process prefetched blocks.'''
614613
if self.reorg_count is not None:
615-
await self.reorg_chain(self.reorg_count)
614+
await self._reorg_chain(self.reorg_count)
616615
self.reorg_count = None
617616
blocks = self.prefetcher.get_prefetched_blocks()
618617
await self._advance_blocks(blocks)

0 commit comments

Comments
 (0)