Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
89d858b
feat(consume): add block production simulator.
spencer-tb Oct 17, 2025
c68a19b
feat(consume): some fixes.
spencer-tb Oct 17, 2025
b7467bf
feat(consume): more fixes.
spencer-tb Oct 17, 2025
a78269f
feat(consume): add production to consume cli.
spencer-tb Oct 17, 2025
5dbe780
feat(consume): add production to hive processors.
spencer-tb Oct 17, 2025
ec29576
feat(consume): move test production to simulator logic.
spencer-tb Oct 17, 2025
2255fe4
chore: temp remove filtering.
spencer-tb Oct 17, 2025
e1cc931
chore: fix production sim path.
spencer-tb Oct 17, 2025
5edcce2
chore: re add filtering.
spencer-tb Oct 17, 2025
a98571e
chore: add consume production to check live port.
spencer-tb Oct 17, 2025
731e164
chore: adjust timeouts.
spencer-tb Oct 17, 2025
ceb0640
chore: adjust timeouts.
spencer-tb Oct 17, 2025
15a4d6d
chore: fix get payload version.
spencer-tb Oct 17, 2025
5b26eda
chore: fix get payload version.
spencer-tb Oct 17, 2025
37f9bf5
chore: fix get payload version.
spencer-tb Oct 17, 2025
5c60cbe
chore: fix invalidity checks.
spencer-tb Oct 29, 2025
98faf21
chore: fix invalidity checks.
spencer-tb Oct 29, 2025
b0cbe12
chore: fix invalidity checks.
spencer-tb Oct 29, 2025
67821f0
chore: fix invalidity checks.
spencer-tb Oct 29, 2025
4fa5b3e
chore: fix invalidity checks.
spencer-tb Oct 29, 2025
bd62975
chore: fix invalidity checks.
spencer-tb Oct 29, 2025
0645f31
chore: fix invalidity checks.
spencer-tb Oct 29, 2025
2f76ede
chore: fix invalidity checks.
spencer-tb Oct 29, 2025
b93abda
chore: fix invalidity checks.
spencer-tb Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/cli/pytest_commands/consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ def get_command_logic_test_paths(command_name: str) -> List[Path]:
command_logic_test_paths = [
base_path / "simulators" / "simulator_logic" / "test_via_sync.py"
]
elif command_name == "production":
command_logic_test_paths = [
base_path / "simulators" / "simulator_logic" / "test_via_production.py"
]
elif command_name == "direct":
command_logic_test_paths = [base_path / "direct" / "test_via_direct.py"]
else:
Expand Down Expand Up @@ -116,6 +120,12 @@ def sync() -> None:
pass


@consume_command(is_hive=True)
def production() -> None:
"""Client builds blocks from mempool transactions (tests block production)."""
pass


@consume.command(
context_settings={"ignore_unknown_options": True},
)
Expand Down
2 changes: 2 additions & 0 deletions src/cli/pytest_commands/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def process_args(self, args: List[str]) -> List[str]:
modified_args.extend(["-p", "pytest_plugins.consume.simulators.engine.conftest"])
elif self.command_name == "sync":
modified_args.extend(["-p", "pytest_plugins.consume.simulators.sync.conftest"])
elif self.command_name == "production":
modified_args.extend(["-p", "pytest_plugins.consume.simulators.production.conftest"])
elif self.command_name == "rlp":
modified_args.extend(["-p", "pytest_plugins.consume.simulators.rlp.conftest"])
else:
Expand Down
6 changes: 5 additions & 1 deletion src/pytest_plugins/consume/simulators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ def check_live_port(test_suite_name: str) -> Literal[8545, 8551]:
"""Port used by hive to check for liveness of the client."""
if test_suite_name == "eest/consume-rlp":
return 8545
elif test_suite_name in {"eest/consume-engine", "eest/consume-sync"}:
elif test_suite_name in {
"eest/consume-engine",
"eest/consume-sync",
"eest/consume-production",
}:
return 8551
raise ValueError(
f"Unexpected test suite name '{test_suite_name}' while setting HIVE_CHECK_LIVE_PORT."
Expand Down
3 changes: 3 additions & 0 deletions src/pytest_plugins/consume/simulators/production/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
Consume Production simulator. Tests block PRODUCTION (building) instead of validation.
"""
228 changes: 228 additions & 0 deletions src/pytest_plugins/consume/simulators/production/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
"""
Pytest fixtures for the `consume production` simulator.

Tests block PRODUCTION (not just validation) by having clients build blocks
from mempool transactions using forkchoiceUpdated + getPayload.
"""

import io
import logging
from typing import Mapping

import pytest
from hive.client import Client

from ethereum_test_exceptions import ExceptionMapper
from ethereum_test_fixtures import BlockchainEngineFixture
from ethereum_test_rpc import EngineRPC

pytest_plugins = (
"pytest_plugins.pytest_hive.pytest_hive",
"pytest_plugins.consume.simulators.base",
"pytest_plugins.consume.simulators.single_test_client",
"pytest_plugins.consume.simulators.test_case_description",
"pytest_plugins.consume.simulators.timing_data",
"pytest_plugins.consume.simulators.exceptions",
)
logger = logging.getLogger(__name__)


def pytest_configure(config: pytest.Config) -> None:
"""Set the supported fixture formats for the production simulator."""
config.supported_fixture_formats = [BlockchainEngineFixture] # type: ignore[attr-defined]


def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
"""
Filter out tests that don't meet production simulator requirements.
"""
with open("/tmp/production_filter_debug.log", "w") as f:
f.write("=" * 80 + "\n")
f.write("COLLECTION PHASE STARTING\n")
f.write("=" * 80 + "\n\n")

for item in items:
if not hasattr(item, "callspec"):
continue

# Check the actual function being called
test_function_name = item.function.__name__ if hasattr(item, "function") else None

# Only process if this is a production test
if test_function_name != "test_blockchain_via_production":
continue

f.write(f"\nTest: {item.nodeid}\n")

# Get test_case from parameters
test_case = item.callspec.params.get("test_case")
if test_case is None:
f.write(" >>> No test_case in params, skipping <<<\n")
continue

f.write(f" test_case type: {type(test_case).__name__}\n")
f.write(f" test_case.format: {test_case.format}\n")

# Check if this is a BlockchainEngineFixture format
if test_case.format != BlockchainEngineFixture:
f.write(" >>> Not BlockchainEngineFixture format, skipping <<<\n")
continue

f.write(" >>> Is BlockchainEngineFixture format <<<\n")

# Now we need to actually load the fixture to check payloads
# Get the fixtures_source from config
fixtures_source = item.config.fixtures_source # type: ignore[attr-defined]

# Load the fixture the same way the test does
from ethereum_test_fixtures.file import Fixtures

if fixtures_source.is_stdin:
# For stdin, fixture is already in test_case
from ethereum_test_fixtures.consume import TestCaseStream

if isinstance(test_case, TestCaseStream):
fixture = test_case.fixture
else:
f.write(" >>> Can't load fixture from stdin test_case <<<\n")
continue
else:
# For file-based, load from disk
from ethereum_test_fixtures.consume import TestCaseIndexFile

if not isinstance(test_case, TestCaseIndexFile):
f.write(" >>> Not TestCaseIndexFile <<<\n")
continue

fixtures_file_path = fixtures_source.path / test_case.json_path
f.write(f" Loading from: {fixtures_file_path}\n")

if not fixtures_file_path.exists():
f.write(" >>> File doesn't exist <<<\n")
continue

fixtures = Fixtures.model_validate_json(fixtures_file_path.read_text())
fixture = fixtures[test_case.id]

f.write(f" Fixture loaded! Type: {type(fixture).__name__}\n")

if not isinstance(fixture, BlockchainEngineFixture):
f.write(" >>> Loaded fixture is not BlockchainEngineFixture <<<\n")
continue

f.write(f" Number of payloads: {len(fixture.payloads)}\n")

# Filter: only single-transaction payloads
has_multi_tx_payload = False
has_invalid_payload = False
has_zero_tx_payload = False

for i, payload in enumerate(fixture.payloads):
f.write(f"\n Payload {i}:\n")

if hasattr(payload, "valid"):
try:
valid_result = payload.valid()
f.write(f" payload.valid() = {valid_result}\n")
except Exception as e:
f.write(f" payload.valid() ERROR: {e}\n")

if hasattr(payload, "validation_error"):
f.write(f" payload.validation_error = {payload.validation_error}\n")

if hasattr(payload, "error_code"):
f.write(f" payload.error_code = {payload.error_code}\n")

# Count transactions
tx_count = len(payload.params[0].transactions)
f.write(f" Transaction count: {tx_count}\n")

if tx_count == 0:
has_zero_tx_payload = True
break

if tx_count > 1:
has_multi_tx_payload = True
break

# Skip invalid payloads
should_skip = False
try:
if not payload.valid():
should_skip = True
f.write(" payload.valid() returned False\n")
except:
pass

if payload.validation_error is not None:
should_skip = True
f.write(" Has validation_error\n")

if payload.error_code is not None:
should_skip = True
f.write(" Has error_code\n")

if should_skip:
f.write(" >>> MARKING AS INVALID <<<\n")
has_invalid_payload = True
break

if has_zero_tx_payload:
f.write("\n >>> WILL SKIP: zero transactions <<<\n")
item.add_marker(
pytest.mark.skip(
reason="Production simulator: zero-transaction payloads not supported"
)
)
elif has_multi_tx_payload:
f.write("\n >>> WILL SKIP: multiple transactions <<<\n")
item.add_marker(
pytest.mark.skip(
reason="Production simulator: multi-transaction payloads not supported"
)
)
elif has_invalid_payload:
f.write("\n >>> WILL SKIP: invalid payload <<<\n")
item.add_marker(
pytest.mark.skip(
reason="Production simulator: only tests valid block production"
)
)
else:
f.write("\n >>> TEST WILL RUN <<<\n")


@pytest.fixture(scope="function")
def engine_rpc(client: Client, client_exception_mapper: ExceptionMapper | None) -> EngineRPC:
"""Initialize engine RPC client for the execution client under test."""
if client_exception_mapper:
return EngineRPC(
f"http://{client.ip}:8551",
response_validation_context={
"exception_mapper": client_exception_mapper,
},
)
return EngineRPC(f"http://{client.ip}:8551")


@pytest.fixture(scope="module")
def test_suite_name() -> str:
"""The name of the hive test suite used in this simulator."""
return "eest/consume-production"


@pytest.fixture(scope="module")
def test_suite_description() -> str:
"""The description of the hive test suite used in this simulator."""
return (
"Test block PRODUCTION (not validation) by having clients build blocks from "
"mempool transactions using forkchoiceUpdated + getPayload flow."
)


@pytest.fixture(scope="function")
def client_files(buffered_genesis: io.BufferedReader) -> Mapping[str, io.BufferedReader]:
"""Define the files that hive will start the client with."""
files = {}
files["/genesis.json"] = buffered_genesis
return files
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Helper functions for production simulator."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Helper functions for block production testing."""

import time
from typing import Any

from ethereum_test_base_types import Bytes, Hash
from ethereum_test_rpc import EthRPC


def wait_for_transaction_in_mempool(
eth_rpc: EthRPC,
tx_hash: Hash,
timeout: int = 10,
poll_interval: float = 0.1,
) -> bool:
"""
Wait for a transaction to appear in the mempool.

Returns True if transaction found, False if timeout reached.
"""
start = time.time()
while time.time() - start < timeout:
try:
tx = eth_rpc.get_transaction_by_hash(tx_hash)
if tx is not None:
return True
except Exception:
pass
time.sleep(poll_interval)

return False


def wait_for_payload_ready(
engine_rpc: Any,
payload_id: Bytes,
get_payload_version: int,
timeout: float = 5.0,
poll_interval: float = 0.1,
) -> Any:
"""
Poll until payload is ready to be retrieved.

Returns the built payload response when ready.
Raises TimeoutError if not ready within timeout.
"""
start = time.time()
last_exception = None

while time.time() - start < timeout:
try:
built_payload_response = engine_rpc.get_payload(
payload_id=payload_id,
version=get_payload_version,
)
return built_payload_response
except Exception as e:
last_exception = e
time.sleep(poll_interval)

elapsed = time.time() - start
raise TimeoutError(f"Payload not ready after {elapsed:.2f}s. Last error: {last_exception}")
Loading
Loading