Skip to content

custody subnet count decoding during discovery #6777

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions beacon_chain/gossip_processing/gossip_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import
# Internals
../spec/[
beaconstate, state_transition_block, forks,
helpers, network, signatures, eip7594_helpers],
helpers, network, signatures, peerdas_helpers],
../consensus_object_pools/[
attestation_pool, blockchain_dag, blob_quarantine, block_quarantine,
data_column_quarantine, spec_cache, light_client_pool, sync_committee_msg_pool,
Expand Down Expand Up @@ -496,7 +496,7 @@ proc validateBlobSidecar*(
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/_features/eip7594/p2p-interface.md#data_column_sidecar_subnet_id
proc validateDataColumnSidecar*(
dag: ChainDAGRef, quarantine: ref Quarantine,
dataColumnQuarantine: ref DataColumnQuarantine,
dataColumnQuarantine: ref DataColumnQuarantine,
data_column_sidecar: DataColumnSidecar,
wallTime: BeaconTime, subnet_id: uint64):
Result[void, ValidationError] =
Expand All @@ -508,14 +508,14 @@ proc validateDataColumnSidecar*(
if not (data_column_sidecar.index < NUMBER_OF_COLUMNS):
return dag.checkedReject("DataColumnSidecar: The sidecar's index should be consistent with NUMBER_OF_COLUMNS")

# [REJECT] The sidecar is for the correct subnet
# [REJECT] The sidecar is for the correct subnet
# -- i.e. `compute_subnet_for_data_column_sidecar(blob_sidecar.index) == subnet_id`.
if not (compute_subnet_for_data_column_sidecar(data_column_sidecar.index) == subnet_id):
return dag.checkedReject("DataColumnSidecar: The sidecar is not for the correct subnet")

# [IGNORE] The sidecar is not from a future slot
# (with a `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance) -- i.e. validate that
# `block_header.slot <= current_slot`(a client MAY queue future sidecars for
# [IGNORE] The sidecar is not from a future slot
# (with a `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance) -- i.e. validate that
# `block_header.slot <= current_slot`(a client MAY queue future sidecars for
# processing at the appropriate slot).
if not (block_header.slot <=
(wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero):
Expand Down Expand Up @@ -608,7 +608,7 @@ proc validateDataColumnSidecar*(
data_column_sidecar.signed_block_header.signature):
return dag.checkedReject("DataColumnSidecar: Invalid proposer signature")

# [REJECT] The sidecar's column data is valid as
# [REJECT] The sidecar's column data is valid as
# verified by `verify_data_column_kzg_proofs(sidecar)`
block:
let r = check_data_column_sidecar_kzg_proofs(data_column_sidecar)
Expand Down
18 changes: 16 additions & 2 deletions beacon_chain/networking/eth2_discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import
std/[algorithm, sequtils],
chronos, chronicles,
eth/p2p/discoveryv5/[enr, protocol, node, random2],
../spec/datatypes/altair,
../spec/datatypes/[altair, fulu],
../spec/eth2_ssz_serialization,
".."/[conf, conf_light_client]

Expand Down Expand Up @@ -127,6 +127,7 @@ proc queryRandom*(
forkId: ENRForkID,
wantedAttnets: AttnetBits,
wantedSyncnets: SyncnetBits,
wantedCscnets: CscBits,
minScore: int): Future[seq[Node]] {.async: (raises: [CancelledError]).} =
## Perform a discovery query for a random target
## (forkId) and matching at least one of the attestation subnets.
Expand All @@ -151,13 +152,26 @@ proc queryRandom*(
if not forkId.isCompatibleForkId(peerForkId):
continue

let cscCountBytes = n.record.get(enrCustodySubnetCountField, seq[byte])
if cscCountBytes.isOk():
let cscCountNode =
try:
SSZ.decode(cscCountBytes.get(), uint8)
except SerializationError as e:
debug "Could not decode the csc ENR field of peer",
peer = n.record.toURI(), exception = e.name, msg = e.msg
continue

if wantedCscnets.countOnes().uint8 == cscCountNode:
score += 1

let attnetsBytes = n.record.get(enrAttestationSubnetsField, seq[byte])
if attnetsBytes.isOk():
let attnetsNode =
try:
SSZ.decode(attnetsBytes.get(), AttnetBits)
except SerializationError as e:
debug "Could not decode the attnets ERN bitfield of peer",
debug "Could not decode the attnets ENR bitfield of peer",
peer = n.record.toURI(), exception = e.name, msg = e.msg
continue

Expand Down
23 changes: 17 additions & 6 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1505,7 +1505,8 @@ proc trimConnections(node: Eth2Node, count: int) =
inc(nbc_cycling_kicked_peers)
if toKick <= 0: return

proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) =
proc getLowSubnets(node: Eth2Node, epoch: Epoch):
(AttnetBits, SyncnetBits, CscBits) =
# Returns the subnets required to have a healthy mesh
# The subnets are computed, to, in order:
# - Have 0 subnet with < `dLow` peers from topic subscription
Expand Down Expand Up @@ -1570,7 +1571,11 @@ proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) =
if epoch + 1 >= node.cfg.ALTAIR_FORK_EPOCH:
findLowSubnets(getSyncCommitteeTopic, SyncSubcommitteeIndex, SYNC_COMMITTEE_SUBNET_COUNT)
else:
default(SyncnetBits)
default(SyncnetBits),
if epoch >= node.cfg.FULU_FORK_EPOCH:
findLowSubnets(getDataColumnSidecarTopic, uint64, (DATA_COLUMN_SIDECAR_SUBNET_COUNT).int)
else:
default(CscBits)
)

proc runDiscoveryLoop(node: Eth2Node) {.async: (raises: [CancelledError]).} =
Expand All @@ -1579,23 +1584,29 @@ proc runDiscoveryLoop(node: Eth2Node) {.async: (raises: [CancelledError]).} =
while true:
let
currentEpoch = node.getBeaconTime().slotOrZero.epoch
(wantedAttnets, wantedSyncnets) = node.getLowSubnets(currentEpoch)
(wantedAttnets, wantedSyncnets, wantedCscnets) = node.getLowSubnets(currentEpoch)
wantedAttnetsCount = wantedAttnets.countOnes()
wantedSyncnetsCount = wantedSyncnets.countOnes()
wantedCscnetsCount = wantedCscnets.countOnes()
outgoingPeers = node.peerPool.lenCurrent({PeerType.Outgoing})
targetOutgoingPeers = max(node.wantedPeers div 10, 3)

if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0 or
outgoingPeers < targetOutgoingPeers:
wantedCscnetsCount > 0 or outgoingPeers < targetOutgoingPeers:

let
minScore =
if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0:
if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0 or
wantedCscnetsCount > 0:
1
else:
0
discoveredNodes = await node.discovery.queryRandom(
node.discoveryForkId, wantedAttnets, wantedSyncnets, minScore)
node.discoveryForkId,
wantedAttnets,
wantedSyncnets,
wantedCscnets,
minScore)

let newPeers = block:
var np = newSeq[PeerAddr]()
Expand Down
4 changes: 2 additions & 2 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import
./spec/datatypes/[altair, bellatrix, phase0],
./spec/[
deposit_snapshots, engine_authentication, weak_subjectivity,
eip7594_helpers],
peerdas_helpers],
./sync/[sync_protocol, light_client_protocol, sync_overseer],
./validators/[keystore_management, beacon_validators],
"."/[
Expand Down Expand Up @@ -535,7 +535,7 @@ proc initFullNode(
processor: processor,
network: node.network)
requestManager = RequestManager.init(
node.network, supernode, custody_columns_set, dag.cfg.DENEB_FORK_EPOCH,
node.network, supernode, custody_columns_set, dag.cfg.DENEB_FORK_EPOCH,
getBeaconTime, (proc(): bool = syncManager.inProgress),
quarantine, blobQuarantine, dataColumnQuarantine, rmanBlockVerifier,
rmanBlockLoader, rmanBlobLoader, rmanDataColumnLoader)
Expand Down
10 changes: 10 additions & 0 deletions beacon_chain/spec/network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,13 @@ func getSyncSubnets*(
iterator blobSidecarTopics*(forkDigest: ForkDigest): string =
for subnet_id in BlobId:
yield getBlobSidecarTopic(forkDigest, subnet_id)

# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/p2p-interface.md#data_column_sidecar_subnet_id
func getDataColumnSidecarTopic*(forkDigest: ForkDigest,
subnet_id: uint64): string =
eth2Prefix(forkDigest) & "data_column_sidecar_" & $subnet_id & "/ssz_snappy"

iterator dataColumnSidecarTopics*(forkDigest: ForkDigest,
targetSubnetCount: uint64): string =
for subnet_id in 0'u64..<targetSubnetCount:
yield getDataColumnSidecarTopic(forkDigest, subnet_id)
28 changes: 14 additions & 14 deletions beacon_chain/sync/request_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import std/[sequtils, strutils]
import chronos, chronicles
import
../spec/datatypes/[phase0, deneb, fulu],
../spec/[forks, network, eip7594_helpers],
../spec/[forks, network, peerdas_helpers],
../networking/eth2_network,
../consensus_object_pools/block_quarantine,
../consensus_object_pools/blob_quarantine,
Expand Down Expand Up @@ -299,23 +299,23 @@ proc checkPeerCustody*(rman: RequestManager,
peer: Peer):
bool =
# Returns true if the peer custodies atleast
# ONE of the common custody columns, straight
# ONE of the common custody columns, straight
# away returns true if the peer is a supernode.
if rman.supernode:
# For a supernode, it is always best/optimistic
# to filter other supernodes, rather than filter
# too many full nodes that have a subset of the custody
# columns
if peer.lookupCscFromPeer() ==
if peer.lookupCscFromPeer() ==
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64:
return true

else:
if peer.lookupCscFromPeer() ==
if peer.lookupCscFromPeer() ==
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64:
return true

elif peer.lookupCscFromPeer() ==
elif peer.lookupCscFromPeer() ==
CUSTODY_REQUIREMENT.uint64:

# Fetch the remote custody count
Expand All @@ -333,9 +333,9 @@ proc checkPeerCustody*(rman: RequestManager,
for local_column in rman.custody_columns_set:
if local_column notin remoteCustodyColumns:
return false

return true

else:
return false

Expand Down Expand Up @@ -551,7 +551,7 @@ proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier]
wallTime = rman.getBeaconTime()
wallSlot = wallTime.slotOrZero()
delay = wallTime - wallSlot.start_beacon_time()

const waitDur = TimeDiff(nanoseconds: DATA_COLUMN_GOSSIP_WAIT_TIME_NS)

var
Expand All @@ -574,7 +574,7 @@ proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier]
commitments = len(forkyBlck.message.body.blob_kzg_commitments)
for idx in missing.indices:
let id = DataColumnIdentifier(block_root: columnless.root, index: idx)
if id.index in rman.custody_columns_set and id notin fetches and
if id.index in rman.custody_columns_set and id notin fetches and
len(forkyBlck.message.body.blob_kzg_commitments) != 0:
fetches.incl(id)
else:
Expand All @@ -583,7 +583,7 @@ proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier]
blk = columnless.root,
commitments = len(forkyBlck.message.body.blob_kzg_commitments)
ready.add(columnless.root)

for root in ready:
let columnless = rman.quarantine[].popColumnless(root).valueOr:
continue
Expand All @@ -593,7 +593,7 @@ proc getMissingDataColumns(rman: RequestManager): HashSet[DataColumnIdentifier]
proc requestManagerDataColumnLoop(
rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true:

await sleepAsync(POLL_INTERVAL)
if rman.inhibit():
continue
Expand Down Expand Up @@ -623,7 +623,7 @@ proc requestManagerDataColumnLoop(
debug "Loaded orphaned data columns from storage", columnId
rman.dataColumnQuarantine[].put(data_column_sidecar)
var verifiers = newSeqOfCap[
Future[Result[void, VerifierError]]
Future[Result[void, VerifierError]]
.Raising([CancelledError])](blockRoots.len)
for blockRoot in blockRoots:
let blck = rman.quarantine[].popColumnless(blockRoot).valueOr:
Expand All @@ -644,7 +644,7 @@ proc requestManagerDataColumnLoop(
array[PARALLEL_REQUESTS_DATA_COLUMNS, Future[void].Raising([CancelledError])]
for i in 0..<PARALLEL_REQUESTS_DATA_COLUMNS:
workers[i] = rman.fetchDataColumnsFromNetwork(columnIds)

await allFutures(workers)
let finish = SyncMoment.now(uint64(len(columnIds)))

Expand Down
2 changes: 1 addition & 1 deletion tests/consensus_spec/test_fixture_networking.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import
kzg4844/[kzg, kzg_abi],
stint,
eth/p2p/discoveryv5/[node],
../../beacon_chain/spec/eip7594_helpers,
../../beacon_chain/spec/peerdas_helpers,
../testutil,
./fixtures_utils, ./os_ops

Expand Down
13 changes: 9 additions & 4 deletions tests/test_discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ proc generateNode(rng: ref HmacDrbgContext, port: Port,

# TODO: Add tests with a syncnets preference
const noSyncnetsPreference = SyncnetBits()
const noCscnetsPreference = CscBits()

procSuite "Eth2 specific discovery tests":
let
Expand Down Expand Up @@ -67,7 +68,8 @@ procSuite "Eth2 specific discovery tests":
attnetsSelected.setBit(34)

let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
enrForkId, attnetsSelected, noSyncnetsPreference,
noCscnetsPreference, 1)
check discovered.len == 1

await node1.closeWait()
Expand Down Expand Up @@ -105,7 +107,8 @@ procSuite "Eth2 specific discovery tests":
attnetsSelected.setBit(42)

let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
enrForkId, attnetsSelected, noSyncnetsPreference,
noCscnetsPreference, 1)
check discovered.len == 1

await node1.closeWait()
Expand Down Expand Up @@ -133,7 +136,8 @@ procSuite "Eth2 specific discovery tests":

block:
let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
enrForkId, attnetsSelected, noSyncnetsPreference,
noCscnetsPreference, 1)
check discovered.len == 0

block:
Expand All @@ -148,7 +152,8 @@ procSuite "Eth2 specific discovery tests":
discard node1.addNode(nodes[][0])

let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
enrForkId, attnetsSelected, noSyncnetsPreference,
noCscnetsPreference, 1)
check discovered.len == 1

await node1.closeWait()
Expand Down
4 changes: 2 additions & 2 deletions tests/test_eip7594_helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import
results,
kzg4844/[kzg_abi, kzg],
./consensus_spec/[os_ops, fixtures_utils],
../beacon_chain/spec/[helpers, eip7594_helpers],
../beacon_chain/spec/[helpers, peerdas_helpers],
../beacon_chain/spec/datatypes/[fulu, deneb]

from std/strutils import rsplit
Expand Down Expand Up @@ -79,7 +79,7 @@ suite "EIP-7594 Unit Tests":
blob_count = rng.rand(1..(NUMBER_OF_COLUMNS.int))
blobs = createSampleKzgBlobs(blob_count, rng.rand(int))
extended_matrix = compute_matrix(blobs)

# Construct a matrix with some entries missing
var partial_matrix: seq[MatrixEntry]
for blob_entries in chunks(extended_matrix.get, kzg_abi.CELLS_PER_EXT_BLOB):
Expand Down
Loading