Skip to content

Commit

Permalink
Allow beam sync to start from a trusted checkpoint
Browse files Browse the repository at this point in the history
This teaches beam sync a new CLI parameter:

--beam-from-checkpoint="block://blockhash/blocknumber/score"

When given, beam sync will use this as a checkpoint
to avoid having to download the entire chain of headers
first.

Relates to ethereum#892
  • Loading branch information
cburgdorf committed Aug 22, 2019
1 parent 35ab26f commit 55da0f0
Show file tree
Hide file tree
Showing 15 changed files with 442 additions and 30 deletions.
8 changes: 8 additions & 0 deletions newsfragments/921.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Allow beam sync to start from a trusted checkpoint.
Specify a checkpoint via CLI parameter such as:

``--beam-from-checkpoint="eth://block/byhash/<hash>?score=<score>"``

When given, beam sync will use this as a checkpoint
to avoid having to download the entire chain of headers
first.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
from setuptools import setup, find_packages

PYEVM_DEPENDENCY = "py-evm==0.3.0a3"
PYEVM_DEPENDENCY = "py-evm==0.3.0a5" # noqa: E501


deps = {
Expand Down
51 changes: 51 additions & 0 deletions tests/core/cli/test_parse_checkpoint_uri.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import pytest

from eth_utils import (
encode_hex,
ValidationError,
)

from trinity.plugins.builtin.syncer.cli import (
parse_checkpoint_uri
)


@pytest.mark.parametrize(
'uri, expected',
(
(
'eth://block/byhash/0x113f05289c685eb5b87d433c3e09ec2bfa51d6472cc37108d03b0113b11e3080?score=11', # noqa: E501
('0x113f05289c685eb5b87d433c3e09ec2bfa51d6472cc37108d03b0113b11e3080', 78, 11),
),
(
'eth://block/byhash/0x113f05289c685eb5b87d433c3e09ec2bfa51d6472cc37108d03b0113b11e3080?score=1,1', # noqa: E501
('0x113f05289c685eb5b87d433c3e09ec2bfa51d6472cc37108d03b0113b11e3080', 78, 11),
),
(
'eth://block/byhash/0x113f05289c685eb5b87d433c3e09ec2bfa51d6472cc37108d03b0113b11e3080?score=1 1', # noqa: E501
('0x113f05289c685eb5b87d433c3e09ec2bfa51d6472cc37108d03b0113b11e3080', 78, 11),
),
)
)
def test_parse_checkpoint(uri, expected):
block_hash, block_number, block_score = expected
checkpoint = parse_checkpoint_uri(uri)
assert encode_hex(checkpoint.block_hash) == block_hash
assert checkpoint.score == block_score


@pytest.mark.parametrize(
'uri',
(
'meh://block/byhash/0x113f05289c685eb5b87d433c3e09ec2bfa51d6472cc37108d03b0113b11e3080?score=11', # noqa: E501
'eth://meh/byhash/0x113f05289c685eb5b87d433c3e09ec2bfa51d6472cc37108d03b0113b11e3080?score=11', # noqa: E501
'eth://block/meh/0x113f05289c685eb5b87d433c3e09ec2bfa51d6472cc37108d03b0113b11e3080?score=11', # noqa: E501
'eth://block/byhash/meh?score=78',
'eth://block/meh/0x113f05289c685eb5b87d433c3e09ec2bfa51d6472cc37108d03b0113b11e3080?score=11', # noqa: E501
'eth://block/meh/0x113f05289c685eb5b87d433c3e09ec2bfa51d6472cc37108d03b0113b11e3080?score=meh', # noqa: E501
'eth://block/byhash/0x113f05289c685eb5b87d433c3e09ec2bfa51d6472cc37108d03b0113b11e3080',
)
)
def test_throws_validation_error(uri):
with pytest.raises(ValidationError):
parse_checkpoint_uri(uri)
34 changes: 31 additions & 3 deletions tests/core/p2p-proto/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from eth.db.atomic import AtomicDB
from eth.exceptions import HeaderNotFound
from eth.vm.forks.petersburg import PetersburgVM
from eth_utils import decode_hex
from lahja import ConnectionConfig, AsyncioEndpoint
from p2p.service import BaseService
import pytest
Expand All @@ -16,6 +17,7 @@
)
from trinity.protocol.eth.sync import ETHHeaderChainSyncer
from trinity.protocol.les.servers import LightRequestServer
from trinity.sync.common.checkpoint import Checkpoint
from trinity.sync.common.chain import (
SimpleBlockImporter,
)
Expand Down Expand Up @@ -146,6 +148,30 @@ async def test_skeleton_syncer(request, event_loop, event_bus, chaindb_fresh, ch
assert head.state_root in chaindb_fresh.db


@pytest.mark.asyncio
async def test_beam_syncer_with_checkpoint(
request,
event_loop,
event_bus,
chaindb_fresh,
chaindb_churner):

checkpoint = Checkpoint(
block_hash=decode_hex('0x5b8d32e4aebda3da7bdf2f0588cb42256e2ed0c268efec71b38278df8488a263'),
score=55,
)

await test_beam_syncer(
request,
event_loop,
event_bus,
chaindb_fresh,
chaindb_churner,
beam_to_block=66,
checkpoint=checkpoint,
)


# Identified tricky scenarios:
# - 66: Missing an account trie node required for account deletion trie fixups,
# when "resuming" execution after completing all transactions
Expand All @@ -163,7 +189,8 @@ async def test_beam_syncer(
event_bus,
chaindb_fresh,
chaindb_churner,
beam_to_block):
beam_to_block,
checkpoint=None):

client_context = ChainContextFactory(headerdb__db=chaindb_fresh.db)
server_context = ChainContextFactory(headerdb__db=chaindb_churner.db)
Expand Down Expand Up @@ -209,7 +236,8 @@ async def test_beam_syncer(
AsyncChainDB(chaindb_fresh.db),
client_peer_pool,
gatherer_endpoint,
beam_to_block,
force_beam_block_number=beam_to_block,
checkpoint=checkpoint,
)

client_peer.logger.info("%s is serving churner blocks", client_peer)
Expand Down Expand Up @@ -311,7 +339,7 @@ async def until_headers_requested(self):
def __init__(self, chain, db, peer_pool, token=None) -> None:
super().__init__(token=token)
self._chain = chain
self._header_syncer = ETHHeaderChainSyncer(chain, db, peer_pool, self.cancel_token)
self._header_syncer = ETHHeaderChainSyncer(chain, db, peer_pool, token=self.cancel_token)
self._single_header_syncer = self.HeaderSyncer_OnlyOne(self._header_syncer)
self._paused_header_syncer = self.HeaderSyncer_PauseThenRest(self._header_syncer)
self._draining_syncer = RegularChainBodySyncer(
Expand Down
1 change: 1 addition & 0 deletions trinity/db/eth1/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class AsyncChainDB(BaseAsyncChainDB):
coro_header_exists = async_method(BaseAsyncChainDB.header_exists)
coro_get_canonical_block_hash = async_method(BaseAsyncChainDB.get_canonical_block_hash)
coro_get_canonical_block_header_by_number = async_method(BaseAsyncChainDB.get_canonical_block_header_by_number) # noqa: E501
coro_persist_checkpoint_header = async_method(BaseAsyncChainDB.persist_checkpoint_header)
coro_persist_header = async_method(BaseAsyncChainDB.persist_header)
coro_persist_header_chain = async_method(BaseAsyncChainDB.persist_header_chain)
coro_persist_block = async_method(BaseAsyncChainDB.persist_block)
Expand Down
13 changes: 10 additions & 3 deletions trinity/db/eth1/header.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,15 @@ async def coro_persist_header(self, header: BlockHeaderAPI) -> Tuple[BlockHeader
...

@abstractmethod
async def coro_persist_header_chain(self,
headers: Iterable[BlockHeaderAPI],
) -> Tuple[BlockHeaderAPI, ...]:
async def coro_persist_checkpoint_header(self, header: BlockHeaderAPI, score: int) -> None:
...

@abstractmethod
async def coro_persist_header_chain(
self,
headers: Iterable[BlockHeaderAPI],
genesis_parent_hash: Hash32=None
) -> Tuple[BlockHeaderAPI, ...]:
...


Expand All @@ -71,5 +77,6 @@ class AsyncHeaderDB(BaseAsyncHeaderDB):
coro_get_score = async_method(BaseAsyncHeaderDB.get_score)
coro_header_exists = async_method(BaseAsyncHeaderDB.header_exists)
coro_get_canonical_block_hash = async_method(BaseAsyncHeaderDB.get_canonical_block_hash)
coro_persist_checkpoint_header = async_method(BaseAsyncHeaderDB.persist_checkpoint_header)
coro_persist_header = async_method(BaseAsyncHeaderDB.persist_header)
coro_persist_header_chain = async_method(BaseAsyncHeaderDB.persist_header_chain)
81 changes: 81 additions & 0 deletions trinity/plugins/builtin/syncer/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import argparse
from pathlib import PurePosixPath
import re
import urllib
from typing import (
Any,
)

from eth_typing import (
Hash32,
)

from eth_utils import (
is_hex,
remove_0x_prefix,
decode_hex,
ValidationError,
)

from trinity.sync.common.checkpoint import Checkpoint


def is_block_hash(value: str) -> bool:
return is_hex(value) and len(remove_0x_prefix(value)) == 64


def remove_non_digits(value: str) -> str:
return re.sub("\D", "", value)


def parse_checkpoint_uri(uri: str) -> Checkpoint:
try:
parsed = urllib.parse.urlparse(uri)
except ValueError as e:
raise ValidationError(str(e))

scheme, netloc, path, query = parsed.scheme, parsed.netloc, parsed.path.lower(), parsed.query

try:
parsed_query = urllib.parse.parse_qsl(query)
except ValueError as e:
raise ValidationError(str(e))

query_dict = dict(parsed_query)

# we allow any kind of separator for a nicer UX. e.g. instead of "11487662456884849810705"
# one can use "114 876 624 568 848 498 107 05" or "1,487,662,456,884,849,810,705". This also
# allows copying out a value from e.g etherscan.
score = remove_non_digits(query_dict.get('score', ''))

is_by_hash = path.startswith('/byhash')
parts = PurePosixPath(parsed.path).parts

if len(parts) != 3 or scheme != 'eth' or netloc != 'block' or not is_by_hash or not score:
raise ValidationError(
'checkpoint string must be of the form'
'"eth://block/byhash/<hash>?score=<score>"'
)

block_hash = parts[2]

if not is_block_hash(block_hash):
raise ValidationError(f'Block hash must be valid hex string, got: {block_hash}')

if not score.isdigit():
raise ValidationError(f'Score (total difficulty) must be an integer, got: {score}')

return Checkpoint(Hash32(decode_hex(block_hash)), int(score))


class NormalizeCheckpointURI(argparse.Action):
"""
Normalize the URI describing a sync checkpoint.
"""
def __call__(self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
value: Any,
option_string: str=None) -> None:
parsed = parse_checkpoint_uri(value)
setattr(namespace, self.dest, parsed)
12 changes: 12 additions & 0 deletions trinity/plugins/builtin/syncer/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from trinity._utils.shutdown import (
exit_with_services,
)
from .cli import NormalizeCheckpointURI


class BaseSyncStrategy(ABC):
Expand Down Expand Up @@ -200,6 +201,16 @@ def configure_parser(cls, arg_parser: ArgumentParser) -> None:
default=None,
)

arg_parser.add_argument(
'--beam-from-checkpoint',
action=NormalizeCheckpointURI,
help=(
"Start beam sync from a trusted checkpoint specified using URI syntax:"
"eth://block/byhash/<hash>?score=<score>"
),
default=None,
)

async def sync(self,
args: Namespace,
logger: Logger,
Expand All @@ -215,6 +226,7 @@ async def sync(self,
base_db,
cast(ETHPeerPool, peer_pool),
event_bus,
args.beam_from_checkpoint,
args.force_beam_block_number,
cancel_token,
)
Expand Down
Loading

0 comments on commit 55da0f0

Please sign in to comment.