-
Notifications
You must be signed in to change notification settings - Fork 668
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
statement-distribution
: RFC103 implementation
#5883
base: sandreim/node_v2_descriptors
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,12 +46,17 @@ use polkadot_node_subsystem_util::{ | |
backing_implicit_view::View as ImplicitView, | ||
reputation::ReputationAggregator, | ||
runtime::{ | ||
fetch_claim_queue, request_min_backing_votes, ClaimQueueSnapshot, ProspectiveParachainsMode, | ||
request_min_backing_votes, request_node_features, ClaimQueueSnapshot, | ||
ProspectiveParachainsMode, | ||
}, | ||
}; | ||
use polkadot_primitives::{ | ||
vstaging::CoreState, AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, | ||
GroupIndex, GroupRotationInfo, Hash, Id as ParaId, IndexedVec, SessionIndex, SessionInfo, | ||
node_features::FeatureIndex, | ||
vstaging::{ | ||
transpose_claim_queue, CandidateDescriptorVersion, CoreState, TransposedClaimQueue, | ||
}, | ||
AuthorityDiscoveryId, CandidateHash, CompactStatement, CoreIndex, GroupIndex, | ||
GroupRotationInfo, Hash, Id as ParaId, IndexedVec, NodeFeatures, SessionIndex, SessionInfo, | ||
SignedStatement, SigningContext, UncheckedSignedStatement, ValidatorId, ValidatorIndex, | ||
}; | ||
|
||
|
@@ -69,7 +74,7 @@ use futures::{ | |
use std::{ | ||
collections::{ | ||
hash_map::{Entry, HashMap}, | ||
HashSet, | ||
BTreeMap, HashSet, | ||
}, | ||
time::{Duration, Instant}, | ||
}; | ||
|
@@ -137,6 +142,12 @@ const COST_UNREQUESTED_RESPONSE_STATEMENT: Rep = | |
Rep::CostMajor("Un-requested Statement In Response"); | ||
const COST_INACCURATE_ADVERTISEMENT: Rep = | ||
Rep::CostMajor("Peer advertised a candidate inaccurately"); | ||
const COST_INVALID_DESCRIPTOR_VERSION: Rep = | ||
Rep::CostMajor("Candidate Descriptor version is invalid"); | ||
const COST_INVALID_CORE_INDEX: Rep = | ||
Rep::CostMajor("Candidate Descriptor contains an invalid core index"); | ||
const COST_INVALID_SESSION_INDEX: Rep = | ||
Rep::CostMajor("Candidate Descriptor contains an invalid session index"); | ||
|
||
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request"); | ||
const COST_INVALID_REQUEST_BITFIELD_SIZE: Rep = | ||
|
@@ -156,6 +167,7 @@ struct PerRelayParentState { | |
statement_store: StatementStore, | ||
seconding_limit: usize, | ||
session: SessionIndex, | ||
transposed_cq: TransposedClaimQueue, | ||
groups_per_para: HashMap<ParaId, Vec<GroupIndex>>, | ||
disabled_validators: HashSet<ValidatorIndex>, | ||
} | ||
|
@@ -219,10 +231,17 @@ struct PerSessionState { | |
// getting the topology from the gossip-support subsystem | ||
grid_view: Option<grid::SessionTopologyView>, | ||
local_validator: Option<LocalValidatorIndex>, | ||
// `true` if v2 candidate receipts are allowed by the runtime | ||
v2_receipts: bool, | ||
} | ||
|
||
impl PerSessionState { | ||
fn new(session_info: SessionInfo, keystore: &KeystorePtr, backing_threshold: u32) -> Self { | ||
fn new( | ||
session_info: SessionInfo, | ||
keystore: &KeystorePtr, | ||
backing_threshold: u32, | ||
v2_receipts: bool, | ||
) -> Self { | ||
let groups = Groups::new(session_info.validator_groups.clone(), backing_threshold); | ||
let mut authority_lookup = HashMap::new(); | ||
for (i, ad) in session_info.discovery_keys.iter().cloned().enumerate() { | ||
|
@@ -235,7 +254,14 @@ impl PerSessionState { | |
) | ||
.map(|(_, index)| LocalValidatorIndex::Active(index)); | ||
|
||
PerSessionState { session_info, groups, authority_lookup, grid_view: None, local_validator } | ||
PerSessionState { | ||
session_info, | ||
groups, | ||
authority_lookup, | ||
grid_view: None, | ||
local_validator, | ||
v2_receipts, | ||
} | ||
} | ||
|
||
fn supply_topology( | ||
|
@@ -271,6 +297,11 @@ impl PerSessionState { | |
fn is_not_validator(&self) -> bool { | ||
self.grid_view.is_some() && self.local_validator.is_none() | ||
} | ||
|
||
/// Returns `true` if v2 candidate receipts are enabled | ||
fn candidate_receipt_v2_enabled(&self) -> bool { | ||
self.v2_receipts | ||
} | ||
} | ||
|
||
pub(crate) struct State { | ||
|
@@ -280,7 +311,7 @@ pub(crate) struct State { | |
implicit_view: ImplicitView, | ||
candidates: Candidates, | ||
per_relay_parent: HashMap<Hash, PerRelayParentState>, | ||
per_session: HashMap<SessionIndex, PerSessionState>, | ||
per_session: BTreeMap<SessionIndex, PerSessionState>, | ||
// Topology might be received before first leaf update, where we | ||
// initialize the per_session_state, so cache it here until we | ||
// are able to use it. | ||
|
@@ -299,7 +330,7 @@ impl State { | |
implicit_view: Default::default(), | ||
candidates: Default::default(), | ||
per_relay_parent: HashMap::new(), | ||
per_session: HashMap::new(), | ||
per_session: BTreeMap::new(), | ||
peers: HashMap::new(), | ||
keystore, | ||
authorities: HashMap::new(), | ||
|
@@ -615,8 +646,18 @@ pub(crate) async fn handle_active_leaves_update<Context>( | |
|
||
let minimum_backing_votes = | ||
request_min_backing_votes(new_relay_parent, session_index, ctx.sender()).await?; | ||
let mut per_session_state = | ||
PerSessionState::new(session_info, &state.keystore, minimum_backing_votes); | ||
let node_features = | ||
request_node_features(new_relay_parent, session_index, ctx.sender()).await?; | ||
let mut per_session_state = PerSessionState::new( | ||
session_info, | ||
&state.keystore, | ||
minimum_backing_votes, | ||
node_features | ||
.unwrap_or(NodeFeatures::EMPTY) | ||
.get(FeatureIndex::CandidateReceiptV2 as usize) | ||
.map(|b| *b) | ||
.unwrap_or(false), | ||
); | ||
if let Some(topology) = state.unused_topologies.remove(&session_index) { | ||
per_session_state.supply_topology(&topology.topology, topology.local_index); | ||
} | ||
|
@@ -662,12 +703,11 @@ pub(crate) async fn handle_active_leaves_update<Context>( | |
.map_err(JfyiError::FetchValidatorGroups)? | ||
.1; | ||
|
||
let maybe_claim_queue = fetch_claim_queue(ctx.sender(), new_relay_parent) | ||
.await | ||
.unwrap_or_else(|err| { | ||
gum::debug!(target: LOG_TARGET, ?new_relay_parent, ?err, "handle_active_leaves_update: `claim_queue` API not available"); | ||
None | ||
}); | ||
let claim_queue = ClaimQueueSnapshot(polkadot_node_subsystem_util::request_claim_queue(new_relay_parent, ctx.sender()) | ||
.await | ||
.await | ||
.map_err(JfyiError::RuntimeApiUnavailable)? | ||
.map_err(JfyiError::FetchClaimQueue)?); | ||
|
||
let local_validator = per_session.local_validator.and_then(|v| { | ||
if let LocalValidatorIndex::Active(idx) = v { | ||
|
@@ -676,9 +716,8 @@ pub(crate) async fn handle_active_leaves_update<Context>( | |
&per_session.groups, | ||
&availability_cores, | ||
&group_rotation_info, | ||
&maybe_claim_queue, | ||
&claim_queue, | ||
seconding_limit, | ||
max_candidate_depth, | ||
) | ||
} else { | ||
Some(LocalValidatorState { grid_tracker: GridTracker::default(), active: None }) | ||
|
@@ -688,11 +727,12 @@ pub(crate) async fn handle_active_leaves_update<Context>( | |
let groups_per_para = determine_groups_per_para( | ||
availability_cores, | ||
group_rotation_info, | ||
&maybe_claim_queue, | ||
max_candidate_depth, | ||
&claim_queue, | ||
) | ||
.await; | ||
|
||
let transposed_cq = transpose_claim_queue(claim_queue.0); | ||
|
||
state.per_relay_parent.insert( | ||
new_relay_parent, | ||
PerRelayParentState { | ||
|
@@ -702,6 +742,7 @@ pub(crate) async fn handle_active_leaves_update<Context>( | |
session: session_index, | ||
groups_per_para, | ||
disabled_validators, | ||
transposed_cq, | ||
}, | ||
); | ||
} | ||
|
@@ -743,9 +784,8 @@ fn find_active_validator_state( | |
groups: &Groups, | ||
availability_cores: &[CoreState], | ||
group_rotation_info: &GroupRotationInfo, | ||
maybe_claim_queue: &Option<ClaimQueueSnapshot>, | ||
claim_queue: &ClaimQueueSnapshot, | ||
seconding_limit: usize, | ||
max_candidate_depth: usize, | ||
) -> Option<LocalValidatorState> { | ||
if groups.all().is_empty() { | ||
return None | ||
|
@@ -754,22 +794,7 @@ fn find_active_validator_state( | |
let our_group = groups.by_validator_index(validator_index)?; | ||
|
||
let core_index = group_rotation_info.core_for_group(our_group, availability_cores.len()); | ||
let paras_assigned_to_core = if let Some(claim_queue) = maybe_claim_queue { | ||
claim_queue.iter_claims_for_core(&core_index).copied().collect() | ||
} else { | ||
availability_cores | ||
.get(core_index.0 as usize) | ||
.and_then(|core_state| match core_state { | ||
CoreState::Scheduled(scheduled_core) => Some(scheduled_core.para_id), | ||
CoreState::Occupied(occupied_core) if max_candidate_depth >= 1 => occupied_core | ||
.next_up_on_available | ||
.as_ref() | ||
.map(|scheduled_core| scheduled_core.para_id), | ||
CoreState::Free | CoreState::Occupied(_) => None, | ||
}) | ||
.into_iter() | ||
.collect() | ||
}; | ||
let paras_assigned_to_core = claim_queue.iter_claims_for_core(&core_index).copied().collect(); | ||
let group_validators = groups.get(our_group)?.to_owned(); | ||
|
||
Some(LocalValidatorState { | ||
|
@@ -2176,37 +2201,18 @@ async fn provide_candidate_to_grid<Context>( | |
async fn determine_groups_per_para( | ||
availability_cores: Vec<CoreState>, | ||
group_rotation_info: GroupRotationInfo, | ||
maybe_claim_queue: &Option<ClaimQueueSnapshot>, | ||
max_candidate_depth: usize, | ||
claim_queue: &ClaimQueueSnapshot, | ||
) -> HashMap<ParaId, Vec<GroupIndex>> { | ||
let n_cores = availability_cores.len(); | ||
|
||
// Determine the core indices occupied by each para at the current relay parent. To support | ||
// on-demand parachains we also consider the core indices at next blocks. | ||
let schedule: HashMap<CoreIndex, Vec<ParaId>> = if let Some(claim_queue) = maybe_claim_queue { | ||
let schedule: HashMap<CoreIndex, Vec<ParaId>> = | ||
claim_queue | ||
.iter_all_claims() | ||
.map(|(core_index, paras)| (*core_index, paras.iter().copied().collect())) | ||
.collect() | ||
} else { | ||
availability_cores | ||
.into_iter() | ||
.enumerate() | ||
.filter_map(|(index, core)| match core { | ||
CoreState::Scheduled(scheduled_core) => | ||
Some((CoreIndex(index as u32), vec![scheduled_core.para_id])), | ||
CoreState::Occupied(occupied_core) => | ||
if max_candidate_depth >= 1 { | ||
occupied_core.next_up_on_available.map(|scheduled_core| { | ||
(CoreIndex(index as u32), vec![scheduled_core.para_id]) | ||
}) | ||
} else { | ||
None | ||
}, | ||
CoreState::Free => None, | ||
}) | ||
.collect() | ||
}; | ||
.collect(); | ||
|
||
|
||
let mut groups_per_para = HashMap::new(); | ||
// Map from `CoreIndex` to `GroupIndex` and collect as `HashMap`. | ||
|
@@ -2353,7 +2359,7 @@ async fn handle_incoming_manifest_common<'a, Context>( | |
peer: PeerId, | ||
peers: &HashMap<PeerId, PeerState>, | ||
per_relay_parent: &'a mut HashMap<Hash, PerRelayParentState>, | ||
per_session: &'a HashMap<SessionIndex, PerSessionState>, | ||
per_session: &'a BTreeMap<SessionIndex, PerSessionState>, | ||
candidates: &mut Candidates, | ||
candidate_hash: CandidateHash, | ||
relay_parent: Hash, | ||
|
@@ -3106,11 +3112,12 @@ pub(crate) async fn handle_response<Context>( | |
) { | ||
let &requests::CandidateIdentifier { relay_parent, candidate_hash, group_index } = | ||
response.candidate_identifier(); | ||
let peer = response.requested_peer().clone(); | ||
|
||
gum::trace!( | ||
target: LOG_TARGET, | ||
?candidate_hash, | ||
peer = ?response.requested_peer(), | ||
?peer, | ||
"Received response", | ||
); | ||
|
||
|
@@ -3174,6 +3181,62 @@ pub(crate) async fn handle_response<Context>( | |
"Successfully received candidate" | ||
); | ||
|
||
if !per_session.candidate_receipt_v2_enabled() && | ||
candidate.descriptor.version() == CandidateDescriptorVersion::V2 | ||
{ | ||
gum::debug!( | ||
target: LOG_TARGET, | ||
?candidate_hash, | ||
?peer, | ||
"Version 2 candidate receipts are not enabled by the runtime" | ||
); | ||
|
||
// Punish peer. | ||
modify_reputation( | ||
reputation, | ||
ctx.sender(), | ||
peer, | ||
COST_INVALID_DESCRIPTOR_VERSION, | ||
) | ||
.await; | ||
return | ||
} | ||
|
||
// Get the latest session index & check candidate descriptor session index. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't we check against the session index at the relay parent? |
||
match (candidate.descriptor.session_index(), state.per_session.last_key_value()) { | ||
(Some(session_index), Some((latest_session_index, _))) => { | ||
if &session_index != latest_session_index { | ||
// Punish peer. | ||
modify_reputation( | ||
reputation, | ||
ctx.sender(), | ||
peer, | ||
COST_INVALID_SESSION_INDEX, | ||
) | ||
.await; | ||
return | ||
} | ||
// TODO: determine if we need to buffer candidates at session boundaries. | ||
}, | ||
_ => {}, | ||
} | ||
|
||
// Validate the core index. | ||
if let Err(err) = candidate.check_core_index(&relay_parent_state.transposed_cq) { | ||
gum::debug!( | ||
target: LOG_TARGET, | ||
?candidate_hash, | ||
?err, | ||
?peer, | ||
"Received candidate has invalid core index" | ||
); | ||
|
||
// Punish peer. | ||
modify_reputation(reputation, ctx.sender(), peer, COST_INVALID_CORE_INDEX) | ||
.await; | ||
return | ||
} | ||
|
||
(candidate, persisted_validation_data, statements) | ||
}, | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we preserve the functionality for synchronous backing (max_candidate_depth 0)? In this case we shouldn't take into account cores that are occupied