Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@ This project requires Python 3.12 or later and should be installed via `uv`:
uv python install 3.12
```

#### Installing Rust Nightly (Temporary)

> **Note:** This is a temporary requirement. In the future, the Rust bindings will be removed and all cryptographic primitives will be implemented directly in Python.

This project currently depends on `lean-multisig-py`, a Rust-based Python extension that requires Rust nightly to compile:

```bash
# Install rustup if not already installed
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

# Install and set nightly as default
rustup install nightly
rustup default nightly

# Verify installation
rustc --version # Should show nightly version
```

### Setup

```bash
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ dependencies = [
"pydantic>=2.12.0,<3",
"typing-extensions>=4.4",
"lean-multisig-py>=0.1.0",
"httpx>=0.28.0,<1",
"aiohttp>=3.11.0,<4",
"cryptography>=46.0.0",
]

Expand Down
16 changes: 15 additions & 1 deletion src/lean_spec/subspecs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
"""Subspecifications for the Lean Ethereum Python specifications."""

from .api import (
ApiServer,
ApiServerConfig,
CheckpointSyncError,
fetch_finalized_state,
verify_checkpoint_state,
)
from .genesis import GenesisConfig

__all__ = ["GenesisConfig"]
__all__ = [
"ApiServer",
"ApiServerConfig",
"CheckpointSyncError",
"GenesisConfig",
"fetch_finalized_state",
"verify_checkpoint_state",
]
25 changes: 25 additions & 0 deletions src/lean_spec/subspecs/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""
API server module for checkpoint sync and node status endpoints.

Provides HTTP endpoints for:
- /lean/states/finalized - Serve finalized checkpoint state as SSZ
- /health - Health check endpoint

Also provides a client for checkpoint sync:
- fetch_finalized_state: Download finalized state from a node
"""

from .client import (
CheckpointSyncError,
fetch_finalized_state,
verify_checkpoint_state,
)
from .server import ApiServer, ApiServerConfig

__all__ = [
"ApiServer",
"ApiServerConfig",
"CheckpointSyncError",
"fetch_finalized_state",
"verify_checkpoint_state",
]
123 changes: 123 additions & 0 deletions src/lean_spec/subspecs/api/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
Checkpoint sync client for downloading finalized state from another node.

This client is used for fast synchronization - instead of syncing from genesis,
a node can download the finalized state from a trusted peer and start from there.
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any

import httpx

from lean_spec.subspecs.chain.config import DEVNET_CONFIG
from lean_spec.subspecs.ssz.hash import hash_tree_root

if TYPE_CHECKING:
from lean_spec.subspecs.containers import State

logger = logging.getLogger(__name__)

# Constants
DEFAULT_TIMEOUT = 60.0
FINALIZED_STATE_ENDPOINT = "/lean/states/finalized"


class CheckpointSyncError(Exception):
"""Error during checkpoint sync."""

pass


async def fetch_finalized_state(url: str, state_class: type[Any]) -> "State":
"""
Fetch finalized state from a node via checkpoint sync.

Downloads the finalized state as SSZ binary and deserializes it.

Args:
url: Base URL of the node API (e.g., "http://localhost:5052")
state_class: The State class to deserialize into

Returns:
The finalized State object

Raises:
CheckpointSyncError: If the request fails or state is invalid
"""
base_url = url.rstrip("/")
full_url = f"{base_url}{FINALIZED_STATE_ENDPOINT}"

logger.info(f"Fetching finalized state from {full_url}")

headers = {
"Accept": "application/octet-stream",
}

try:
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
response = await client.get(full_url, headers=headers)
response.raise_for_status()

ssz_data = response.content
logger.info(f"Downloaded {len(ssz_data)} bytes of SSZ state data")

state = state_class.decode_bytes(ssz_data)
logger.info(f"Deserialized state at slot {state.slot}")

return state

except httpx.RequestError as exc:
raise CheckpointSyncError(
f"Network error while connecting to {exc.request.url}: {exc}"
) from exc
except httpx.HTTPStatusError as exc:
raise CheckpointSyncError(
f"HTTP error {exc.response.status_code}: {exc.response.text[:200]}"
) from exc
except CheckpointSyncError:
raise
except Exception as e:
raise CheckpointSyncError(f"Failed to fetch state: {e}") from e


async def verify_checkpoint_state(state: "State") -> bool:
"""
Verify that a checkpoint state is valid.

Performs basic validation checks on the downloaded state.

Args:
state: The state to verify

Returns:
True if valid, False otherwise
"""
try:
computed_root = hash_tree_root(state)

if int(state.slot) < 0:
logger.error("Invalid state: negative slot")
return False

validator_count = len(state.validators)
if validator_count == 0:
logger.error("Invalid state: no validators")
return False

if validator_count > int(DEVNET_CONFIG.validator_registry_limit):
logger.error(
f"Invalid state: validator count {validator_count} exceeds "
f"registry limit {DEVNET_CONFIG.validator_registry_limit}"
)
return False

root_preview = computed_root.hex()[:16]
logger.info(f"Checkpoint state verified: slot={state.slot}, root={root_preview}...")
return True

except Exception as e:
logger.error(f"State verification failed: {e}")
return False
160 changes: 160 additions & 0 deletions src/lean_spec/subspecs/api/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""
API server for checkpoint sync and node status endpoints.

Provides HTTP endpoints for:
- /lean/states/finalized - Serve finalized checkpoint state as SSZ
- /health - Health check endpoint

This matches the checkpoint sync API implemented in zeam.
"""

from __future__ import annotations

import asyncio
import logging
from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING

from aiohttp import web

if TYPE_CHECKING:
from lean_spec.subspecs.forkchoice import Store

logger = logging.getLogger(__name__)


@dataclass(frozen=True, slots=True)
class ApiServerConfig:
"""Configuration for the API server."""

host: str = "0.0.0.0"
"""Host address to bind to."""

port: int = 5052
"""Port to listen on."""

enabled: bool = True
"""Whether the API server is enabled."""


class ApiServer:
"""
HTTP API server for checkpoint sync and node status.

Provides endpoints for:
- Checkpoint sync: Download finalized state for fast sync
- Health checks: Verify node is running

Uses aiohttp to handle HTTP protocol details efficiently.
"""

def __init__(
self,
config: ApiServerConfig,
store_getter: Callable[[], Store | None] = lambda: None,
):
"""
Initialize the API server.

Args:
config: Server configuration.
store_getter: Callable that returns the current Store instance.
"""
self.config = config
self._store_getter = store_getter
self._runner: web.AppRunner | None = None
self._site: web.TCPSite | None = None

def set_store_getter(self, getter: Callable[[], Store | None]) -> None:
"""
Set the store getter function.

Args:
getter: Callable that returns the current Store instance.
"""
self._store_getter = getter

@property
def store(self) -> Store | None:
"""Get the current Store instance."""
return self._store_getter()

async def start(self) -> None:
"""Start the API server in the background."""
if not self.config.enabled:
logger.info("API server is disabled")
return

app = web.Application()
app.add_routes(
[
web.get("/health", self._handle_health),
web.get("/lean/states/finalized", self._handle_finalized_state),
]
)

self._runner = web.AppRunner(app)
await self._runner.setup()

self._site = web.TCPSite(self._runner, self.config.host, self.config.port)
await self._site.start()

logger.info(f"API server listening on {self.config.host}:{self.config.port}")

async def run(self) -> None:
"""
Run the API server until shutdown.

This method blocks until stop() is called.
"""
await self.start()

# Keep running until stopped
while self._runner is not None:
await asyncio.sleep(1)

def stop(self) -> None:
"""Request graceful shutdown."""
if self._runner is not None:
asyncio.create_task(self._async_stop())

async def _async_stop(self) -> None:
"""Gracefully stop the server."""
if self._runner:
await self._runner.cleanup()
self._runner = None
self._site = None
logger.info("API server stopped")

async def _handle_health(self, request: web.Request) -> web.Response:
"""Handle health check endpoint."""
return web.json_response({"status": "healthy", "service": "lean-spec-api"})

async def _handle_finalized_state(self, request: web.Request) -> web.Response:
"""
Handle finalized checkpoint state endpoint.

Serves the finalized state as SSZ binary at /lean/states/finalized.
This endpoint is used for checkpoint sync - clients can download
the finalized state to bootstrap quickly instead of syncing from genesis.
"""
store = self.store
if store is None:
raise web.HTTPServiceUnavailable(reason="Store not initialized")

finalized = store.latest_finalized

if finalized.root not in store.states:
raise web.HTTPNotFound(reason="Finalized state not available")

state = store.states[finalized.root]

# Run CPU-intensive SSZ encoding in a separate thread
try:
ssz_bytes = await asyncio.to_thread(state.encode_bytes)
except Exception as e:
logger.error(f"Failed to encode state: {e}")
raise web.HTTPInternalServerError(reason="Encoding failed") from e

return web.Response(body=ssz_bytes, content_type="application/octet-stream")
Loading
Loading