-
Notifications
You must be signed in to change notification settings - Fork 43
checkpoint sync API specification #279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
96dafb0
feat: Added checkpoint sync API specification
ch4r10t33r d60b94b
chore: fixed lint errors
ch4r10t33r 48c57ab
fix: fixed type errors
ch4r10t33r c6d3daa
Update src/lean_spec/subspecs/api/client.py
ch4r10t33r 96cade5
fix: Addressed review comments
ch4r10t33r 27c975f
Merge branch 'main' of https://github.com/ch4r10t33r/leanSpec
ch4r10t33r 9e47f5f
fixed review comments
ch4r10t33r 6f9fe09
Merge branch 'main' into main
ch4r10t33r File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,19 @@ | ||
| """Subspecifications for the Lean Ethereum Python specifications.""" | ||
|
|
||
| from .api import ( | ||
| ApiServer, | ||
| ApiServerConfig, | ||
| CheckpointSyncError, | ||
| fetch_finalized_state, | ||
| verify_checkpoint_state, | ||
| ) | ||
| from .genesis import GenesisConfig | ||
|
|
||
| __all__ = ["GenesisConfig"] | ||
| __all__ = [ | ||
| "ApiServer", | ||
| "ApiServerConfig", | ||
| "CheckpointSyncError", | ||
| "GenesisConfig", | ||
| "fetch_finalized_state", | ||
| "verify_checkpoint_state", | ||
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| """ | ||
| API server module for checkpoint sync and node status endpoints. | ||
|
|
||
| Provides HTTP endpoints for: | ||
| - /lean/states/finalized - Serve finalized checkpoint state as SSZ | ||
| - /health - Health check endpoint | ||
|
|
||
| Also provides a client for checkpoint sync: | ||
| - fetch_finalized_state: Download finalized state from a node | ||
| """ | ||
|
|
||
| from .client import ( | ||
| CheckpointSyncError, | ||
| fetch_finalized_state, | ||
| verify_checkpoint_state, | ||
| ) | ||
| from .server import ApiServer, ApiServerConfig | ||
|
|
||
| __all__ = [ | ||
| "ApiServer", | ||
| "ApiServerConfig", | ||
| "CheckpointSyncError", | ||
| "fetch_finalized_state", | ||
| "verify_checkpoint_state", | ||
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| """ | ||
| Checkpoint sync client for downloading finalized state from another node. | ||
|
|
||
| This client is used for fast synchronization - instead of syncing from genesis, | ||
| a node can download the finalized state from a trusted peer and start from there. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| from typing import TYPE_CHECKING, Any | ||
|
|
||
| import httpx | ||
|
|
||
| from lean_spec.subspecs.chain.config import DEVNET_CONFIG | ||
| from lean_spec.subspecs.ssz.hash import hash_tree_root | ||
|
|
||
| if TYPE_CHECKING: | ||
| from lean_spec.subspecs.containers import State | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # Constants | ||
| DEFAULT_TIMEOUT = 60.0 | ||
| FINALIZED_STATE_ENDPOINT = "/lean/states/finalized" | ||
|
|
||
|
|
||
| class CheckpointSyncError(Exception): | ||
| """Error during checkpoint sync.""" | ||
|
|
||
| pass | ||
|
|
||
|
|
||
| async def fetch_finalized_state(url: str, state_class: type[Any]) -> "State": | ||
| """ | ||
| Fetch finalized state from a node via checkpoint sync. | ||
|
|
||
| Downloads the finalized state as SSZ binary and deserializes it. | ||
|
|
||
| Args: | ||
| url: Base URL of the node API (e.g., "http://localhost:5052") | ||
| state_class: The State class to deserialize into | ||
|
|
||
| Returns: | ||
| The finalized State object | ||
|
|
||
| Raises: | ||
| CheckpointSyncError: If the request fails or state is invalid | ||
| """ | ||
| base_url = url.rstrip("/") | ||
| full_url = f"{base_url}{FINALIZED_STATE_ENDPOINT}" | ||
|
|
||
| logger.info(f"Fetching finalized state from {full_url}") | ||
|
|
||
| headers = { | ||
| "Accept": "application/octet-stream", | ||
| } | ||
|
|
||
| try: | ||
| async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: | ||
| response = await client.get(full_url, headers=headers) | ||
| response.raise_for_status() | ||
|
|
||
| ssz_data = response.content | ||
| logger.info(f"Downloaded {len(ssz_data)} bytes of SSZ state data") | ||
|
|
||
| state = state_class.decode_bytes(ssz_data) | ||
| logger.info(f"Deserialized state at slot {state.slot}") | ||
|
|
||
| return state | ||
|
|
||
| except httpx.RequestError as exc: | ||
| raise CheckpointSyncError( | ||
| f"Network error while connecting to {exc.request.url}: {exc}" | ||
| ) from exc | ||
| except httpx.HTTPStatusError as exc: | ||
| raise CheckpointSyncError( | ||
| f"HTTP error {exc.response.status_code}: {exc.response.text[:200]}" | ||
| ) from exc | ||
| except CheckpointSyncError: | ||
| raise | ||
| except Exception as e: | ||
| raise CheckpointSyncError(f"Failed to fetch state: {e}") from e | ||
|
|
||
|
|
||
| async def verify_checkpoint_state(state: "State") -> bool: | ||
| """ | ||
| Verify that a checkpoint state is valid. | ||
|
|
||
| Performs basic validation checks on the downloaded state. | ||
|
|
||
| Args: | ||
| state: The state to verify | ||
|
|
||
| Returns: | ||
| True if valid, False otherwise | ||
| """ | ||
| try: | ||
| computed_root = hash_tree_root(state) | ||
|
|
||
| if int(state.slot) < 0: | ||
| logger.error("Invalid state: negative slot") | ||
| return False | ||
|
|
||
| validator_count = len(state.validators) | ||
| if validator_count == 0: | ||
| logger.error("Invalid state: no validators") | ||
| return False | ||
|
|
||
| if validator_count > int(DEVNET_CONFIG.validator_registry_limit): | ||
| logger.error( | ||
| f"Invalid state: validator count {validator_count} exceeds " | ||
| f"registry limit {DEVNET_CONFIG.validator_registry_limit}" | ||
| ) | ||
| return False | ||
|
|
||
| root_preview = computed_root.hex()[:16] | ||
| logger.info(f"Checkpoint state verified: slot={state.slot}, root={root_preview}...") | ||
| return True | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"State verification failed: {e}") | ||
| return False |
ch4r10t33r marked this conversation as resolved.
Show resolved
Hide resolved
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,160 @@ | ||
| """ | ||
| API server for checkpoint sync and node status endpoints. | ||
|
|
||
| Provides HTTP endpoints for: | ||
| - /lean/states/finalized - Serve finalized checkpoint state as SSZ | ||
| - /health - Health check endpoint | ||
|
|
||
| This matches the checkpoint sync API implemented in zeam. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import logging | ||
| from collections.abc import Callable | ||
| from dataclasses import dataclass | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| from aiohttp import web | ||
|
|
||
| if TYPE_CHECKING: | ||
| from lean_spec.subspecs.forkchoice import Store | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| @dataclass(frozen=True, slots=True) | ||
| class ApiServerConfig: | ||
| """Configuration for the API server.""" | ||
|
|
||
| host: str = "0.0.0.0" | ||
| """Host address to bind to.""" | ||
|
|
||
| port: int = 5052 | ||
| """Port to listen on.""" | ||
|
|
||
| enabled: bool = True | ||
| """Whether the API server is enabled.""" | ||
|
|
||
|
|
||
| class ApiServer: | ||
| """ | ||
| HTTP API server for checkpoint sync and node status. | ||
|
|
||
| Provides endpoints for: | ||
| - Checkpoint sync: Download finalized state for fast sync | ||
| - Health checks: Verify node is running | ||
|
|
||
| Uses aiohttp to handle HTTP protocol details efficiently. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| config: ApiServerConfig, | ||
| store_getter: Callable[[], Store | None] = lambda: None, | ||
| ): | ||
| """ | ||
| Initialize the API server. | ||
|
|
||
| Args: | ||
| config: Server configuration. | ||
| store_getter: Callable that returns the current Store instance. | ||
| """ | ||
| self.config = config | ||
| self._store_getter = store_getter | ||
| self._runner: web.AppRunner | None = None | ||
| self._site: web.TCPSite | None = None | ||
|
|
||
| def set_store_getter(self, getter: Callable[[], Store | None]) -> None: | ||
| """ | ||
| Set the store getter function. | ||
|
|
||
| Args: | ||
| getter: Callable that returns the current Store instance. | ||
| """ | ||
| self._store_getter = getter | ||
|
|
||
| @property | ||
| def store(self) -> Store | None: | ||
| """Get the current Store instance.""" | ||
| return self._store_getter() | ||
|
|
||
| async def start(self) -> None: | ||
| """Start the API server in the background.""" | ||
| if not self.config.enabled: | ||
| logger.info("API server is disabled") | ||
| return | ||
|
|
||
| app = web.Application() | ||
| app.add_routes( | ||
| [ | ||
| web.get("/health", self._handle_health), | ||
| web.get("/lean/states/finalized", self._handle_finalized_state), | ||
| ] | ||
| ) | ||
|
|
||
| self._runner = web.AppRunner(app) | ||
| await self._runner.setup() | ||
|
|
||
| self._site = web.TCPSite(self._runner, self.config.host, self.config.port) | ||
| await self._site.start() | ||
|
|
||
| logger.info(f"API server listening on {self.config.host}:{self.config.port}") | ||
|
|
||
| async def run(self) -> None: | ||
| """ | ||
| Run the API server until shutdown. | ||
|
|
||
| This method blocks until stop() is called. | ||
| """ | ||
| await self.start() | ||
|
|
||
| # Keep running until stopped | ||
| while self._runner is not None: | ||
| await asyncio.sleep(1) | ||
|
|
||
| def stop(self) -> None: | ||
| """Request graceful shutdown.""" | ||
| if self._runner is not None: | ||
| asyncio.create_task(self._async_stop()) | ||
|
|
||
| async def _async_stop(self) -> None: | ||
| """Gracefully stop the server.""" | ||
| if self._runner: | ||
| await self._runner.cleanup() | ||
| self._runner = None | ||
| self._site = None | ||
| logger.info("API server stopped") | ||
|
|
||
| async def _handle_health(self, request: web.Request) -> web.Response: | ||
| """Handle health check endpoint.""" | ||
| return web.json_response({"status": "healthy", "service": "lean-spec-api"}) | ||
|
|
||
| async def _handle_finalized_state(self, request: web.Request) -> web.Response: | ||
| """ | ||
| Handle finalized checkpoint state endpoint. | ||
|
|
||
| Serves the finalized state as SSZ binary at /lean/states/finalized. | ||
| This endpoint is used for checkpoint sync - clients can download | ||
| the finalized state to bootstrap quickly instead of syncing from genesis. | ||
| """ | ||
| store = self.store | ||
| if store is None: | ||
| raise web.HTTPServiceUnavailable(reason="Store not initialized") | ||
|
|
||
| finalized = store.latest_finalized | ||
|
|
||
| if finalized.root not in store.states: | ||
| raise web.HTTPNotFound(reason="Finalized state not available") | ||
|
|
||
| state = store.states[finalized.root] | ||
|
|
||
| # Run CPU-intensive SSZ encoding in a separate thread | ||
| try: | ||
| ssz_bytes = await asyncio.to_thread(state.encode_bytes) | ||
| except Exception as e: | ||
| logger.error(f"Failed to encode state: {e}") | ||
| raise web.HTTPInternalServerError(reason="Encoding failed") from e | ||
|
|
||
| return web.Response(body=ssz_bytes, content_type="application/octet-stream") |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.