Skip to content
215 changes: 118 additions & 97 deletions tests/benchmark/compute/instruction/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pytest
from execution_testing import (
Account,
Address,
Alloc,
BenchmarkTestFiller,
Block,
Expand Down Expand Up @@ -54,32 +55,20 @@ def test_xcall(
pre: Alloc,
fork: Fork,
opcode: Op,
env: Environment,
gas_benchmark_value: int,
tx_gas_limit: int,
) -> None:
"""Benchmark a system execution where a single opcode execution."""
# The attack gas limit is the gas limit which the target tx will use The
# test will scale the block gas limit to setup the contracts accordingly to
# be able to pay for the contract deposit. This has to take into account
# the 200 gas per byte, but also the quadratic memory expansion costs which
# have to be paid each time the memory is being setup
# The attack gas limit represents the transaction gas limit cap or
# the block gas limit. If eip-7825 is applied, the test will create
# multiple transactions for contract deployment. It should account
# for the 200 gas per byte cost and the quadratic memory-expansion
# costs, which must be paid each time memory is initialized.
attack_gas_limit = gas_benchmark_value
max_contract_size = fork.max_code_size()

gas_costs = fork.gas_costs()

# Calculate the absolute minimum gas costs to deploy the contract This does
# not take into account setting up the actual memory (using KECCAK256 and
# XOR) so the actual costs of deploying the contract is higher
memory_expansion_gas_calculator = fork.memory_expansion_gas_calculator()
memory_gas_minimum = memory_expansion_gas_calculator(
new_bytes=len(bytes(max_contract_size))
)
code_deposit_gas_minimum = (
fork.gas_costs().G_CODE_DEPOSIT_BYTE * max_contract_size
+ memory_gas_minimum
)

intrinsic_gas_cost_calc = fork.transaction_intrinsic_cost_calculator()
# Calculate the loop cost of the attacker to query one address
loop_cost = (
Expand All @@ -90,23 +79,122 @@ def test_xcall(
+ gas_costs.G_COLD_ACCOUNT_ACCESS # Opcode cost
+ 30 # ~Gluing opcodes
)
# Calculate the number of contracts to be targeted
# Calculate an upper bound of the number of contracts to be targeted
num_contracts = (
# Base available gas = GAS_LIMIT - intrinsic - (out of loop MSTOREs)
attack_gas_limit - intrinsic_gas_cost_calc() - gas_costs.G_VERY_LOW * 4
) // loop_cost

# Set the block gas limit to a relative high value to ensure the code
# deposit tx fits in the block (there is enough gas available in the block
# to execute this)
minimum_gas_limit = code_deposit_gas_minimum * 2 * num_contracts
if env.gas_limit < minimum_gas_limit:
raise Exception(
f"`BENCHMARKING_MAX_GAS` ({env.gas_limit}) is no longer enough to"
f" support this test, which requires {minimum_gas_limit} gas for "
"its setup. Update the value or consider optimizing gas usage "
"during the setup phase of this test."
initcode, factory_address, factory_caller_address = (
_deploy_max_contract_factory(pre, fork)
)

# Deploy num_contracts via multiple txs (each capped by tx gas limit).
with TestPhaseManager.setup():
# Rough estimate (rounded down) of contracts per tx based on dominant
# cost factor only, and up to 90% of the block gas limit.
# The goal is to involve the minimum amount of gas pricing to avoid
# complexity and potential brittleness.
num_contracts_per_tx = int(tx_gas_limit * 0.9) // (
gas_costs.G_CODE_DEPOSIT_BYTE * max_contract_size
)
if num_contracts_per_tx == 0:
pytest.skip("tx_gas_limit too low to deploy max-size contract")
setup_txs = math.ceil(num_contracts / num_contracts_per_tx)

contracts_deployment_txs = []
for _ in range(setup_txs):
contracts_deployment_txs.append(
Transaction(
to=factory_caller_address,
gas_limit=tx_gas_limit,
data=Hash(num_contracts_per_tx),
sender=pre.fund_eoa(),
)
)

post = {}
for i in range(num_contracts):
deployed_contract_address = compute_create2_address(
address=factory_address,
salt=i,
initcode=initcode,
)
post[deployed_contract_address] = Account(nonce=1)

attack_call = Bytecode()
if opcode == Op.EXTCODECOPY:
attack_call = Op.EXTCODECOPY(
address=Op.SHA3(32 - 20 - 1, 85), dest_offset=96, size=1000
)
else:
# For the rest of the opcodes, we can use the same generic attack call
# since all only minimally need the `address` of the target.
attack_call = Op.POP(opcode(address=Op.SHA3(32 - 20 - 1, 85)))
attack_code = (
# Setup memory for later CREATE2 address generation loop.
# 0xFF+[Address(20bytes)]+[seed(32bytes)]+[initcode keccak(32bytes)]
Op.MSTORE(0, factory_address)
+ Op.MSTORE8(32 - 20 - 1, 0xFF)
+ Op.MSTORE(
32, Op.CALLDATALOAD(0)
) # Calldata is the starting value of the CREATE2 salt
+ Op.MSTORE(64, initcode.keccak256())
# Main loop
+ While(
body=attack_call + Op.MSTORE(32, Op.ADD(Op.MLOAD(32), 1)),
)
)

attack_address = pre.deploy_contract(code=attack_code)

with TestPhaseManager.execution():
full_txs = attack_gas_limit // tx_gas_limit
remainder = attack_gas_limit % tx_gas_limit

num_targeted_contracts_per_full_tx = (
# Base available gas:
# TX_GAS_LIMIT - intrinsic - (out of loop MSTOREs)
tx_gas_limit - intrinsic_gas_cost_calc() - gas_costs.G_VERY_LOW * 4
) // loop_cost
contract_start_index = 0
opcode_txs = []
for _ in range(full_txs):
opcode_txs.append(
Transaction(
to=attack_address,
gas_limit=tx_gas_limit,
data=Hash(contract_start_index),
sender=pre.fund_eoa(),
)
)
contract_start_index += num_targeted_contracts_per_full_tx
if remainder > intrinsic_gas_cost_calc(calldata=bytes(32)):
opcode_txs.append(
Transaction(
to=attack_address,
gas_limit=remainder,
data=Hash(contract_start_index),
sender=pre.fund_eoa(),
)
)

blockchain_test(
pre=pre,
post=post,
blocks=[
Block(txs=contracts_deployment_txs),
Block(txs=opcode_txs),
],
exclude_full_post_state_in_output=True,
)


def _deploy_max_contract_factory(
pre: Alloc,
fork: Fork,
) -> tuple[Bytecode, Address, Address]:
max_contract_size = fork.max_code_size()

# The initcode will take its address as a starting point to the input to
# the keccak hash function. It will reuse the output of the hash function
Expand Down Expand Up @@ -177,74 +265,7 @@ def test_xcall(
)
factory_caller_address = pre.deploy_contract(code=factory_caller_code)

with TestPhaseManager.setup():
contracts_deployment_tx = Transaction(
to=factory_caller_address,
gas_limit=env.gas_limit,
gas_price=10**6,
data=Hash(num_contracts),
sender=pre.fund_eoa(),
)

post = {}
deployed_contract_addresses = []
for i in range(num_contracts):
deployed_contract_address = compute_create2_address(
address=factory_address,
salt=i,
initcode=initcode,
)
post[deployed_contract_address] = Account(nonce=1)
deployed_contract_addresses.append(deployed_contract_address)

attack_call = Bytecode()
if opcode == Op.EXTCODECOPY:
attack_call = Op.EXTCODECOPY(
address=Op.SHA3(32 - 20 - 1, 85), dest_offset=96, size=1000
)
else:
# For the rest of the opcodes, we can use the same generic attack call
# since all only minimally need the `address` of the target.
attack_call = Op.POP(opcode(address=Op.SHA3(32 - 20 - 1, 85)))
attack_code = (
# Setup memory for later CREATE2 address generation loop.
# 0xFF+[Address(20bytes)]+[seed(32bytes)]+[initcode keccak(32bytes)]
Op.MSTORE(0, factory_address)
+ Op.MSTORE8(32 - 20 - 1, 0xFF)
+ Op.MSTORE(32, 0)
+ Op.MSTORE(64, initcode.keccak256())
# Main loop
+ While(
body=attack_call + Op.MSTORE(32, Op.ADD(Op.MLOAD(32), 1)),
)
)

if len(attack_code) > max_contract_size:
# TODO: A workaround could be to split the opcode code into multiple
# contracts and call them in sequence.
raise ValueError(
f"Code size {len(attack_code)} exceeds maximum "
f"code size {max_contract_size}"
)
opcode_address = pre.deploy_contract(code=attack_code)

with TestPhaseManager.execution():
opcode_tx = Transaction(
to=opcode_address,
gas_limit=attack_gas_limit,
gas_price=10**9,
sender=pre.fund_eoa(),
)

blockchain_test(
pre=pre,
post=post,
blocks=[
Block(txs=[contracts_deployment_tx]),
Block(txs=[opcode_tx]),
],
exclude_full_post_state_in_output=True,
)
return initcode, factory_address, factory_caller_address


@pytest.mark.parametrize(
Expand Down
Loading