-
Notifications
You must be signed in to change notification settings - Fork 668
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Collation fetching fairness #4880
base: master
Are you sure you want to change the base?
Changes from 80 commits
f4738dc
c7074da
73eee87
fa321ce
96392a5
0f28aa8
e5ea548
9abc898
c07890b
e50440e
42b05c7
2f5a466
ff96ef9
e837689
91cdd13
9f2d59b
a10c86d
b39858a
b30f340
c0f18b9
703ed6d
fba7ca6
d4f4ce2
5f52712
6c73e24
752f3cc
f0069f1
6b9f0b3
5f6dcdd
b8c1b85
f26362f
d6857fc
cde28cd
4c3db2a
b2bbdfe
e220cb4
01d121e
7b3c002
5dffdde
1c1744b
aaccab1
b1df2e3
ce3a95e
fe3c09d
b9ab579
fe623bc
d216689
ea99c7a
ee155f5
55b7902
515a784
4ef6919
bd7174f
df6165e
4c5c271
b0e4627
d1cf41d
df3a215
b70807b
f047036
94e4fc3
386488b
ff312c9
88d0307
af78352
d636091
2bb82eb
c782058
903f7f4
cefbce8
cb69361
1142a90
4438349
e82c386
4b2d4c5
1c91371
be34132
5c7b2ac
62c6473
9e3f62d
a4bc21f
6c103df
15e3a74
d6b35ca
586b56b
a04d480
13d5d15
86870d0
7b822af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,16 +18,25 @@ | |
//! | ||
//! Usually a path of collations is as follows: | ||
//! 1. First, collation must be advertised by collator. | ||
//! 2. If the advertisement was accepted, it's queued for fetch (per relay parent). | ||
//! 3. Once it's requested, the collation is said to be Pending. | ||
//! 4. Pending collation becomes Fetched once received, we send it to backing for validation. | ||
//! 5. If it turns to be invalid or async backing allows seconding another candidate, carry on | ||
//! 2. The validator inspects the claim queue and decides if the collation should be fetched | ||
//! based on the entries there. A parachain can't have more fetched collations than the | ||
//! entries in the claim queue at a specific relay parent. When calculating this limit the | ||
//! validator counts all advertisements within its view not just at the relay parent. | ||
//! 3. If the advertisement was accepted, it's queued for fetch (per relay parent). | ||
//! 4. Once it's requested, the collation is said to be Pending. | ||
//! 5. Pending collation becomes Fetched once received, we send it to backing for validation. | ||
//! 6. If it turns to be invalid or async backing allows seconding another candidate, carry on | ||
//! with the next advertisement, otherwise we're done with this relay parent. | ||
//! | ||
//! ┌──────────────────────────────────────────┐ | ||
//! └─▶Advertised ─▶ Pending ─▶ Fetched ─▶ Validated | ||
|
||
use std::{collections::VecDeque, future::Future, pin::Pin, task::Poll}; | ||
//! ┌───────────────────────────────────┐ | ||
//! └─▶Waiting ─▶ Fetching ─▶ WaitingOnValidation | ||
|
||
use std::{ | ||
collections::{BTreeMap, VecDeque}, | ||
future::Future, | ||
pin::Pin, | ||
task::Poll, | ||
}; | ||
|
||
use futures::{future::BoxFuture, FutureExt}; | ||
use polkadot_node_network_protocol::{ | ||
|
@@ -36,9 +45,7 @@ use polkadot_node_network_protocol::{ | |
PeerId, | ||
}; | ||
use polkadot_node_primitives::PoV; | ||
use polkadot_node_subsystem_util::{ | ||
metrics::prometheus::prometheus::HistogramTimer, runtime::ProspectiveParachainsMode, | ||
}; | ||
use polkadot_node_subsystem_util::metrics::prometheus::prometheus::HistogramTimer; | ||
use polkadot_primitives::{ | ||
CandidateHash, CandidateReceipt, CollatorId, Hash, HeadData, Id as ParaId, | ||
PersistedValidationData, | ||
|
@@ -185,12 +192,10 @@ pub struct PendingCollationFetch { | |
pub enum CollationStatus { | ||
/// We are waiting for a collation to be advertised to us. | ||
Waiting, | ||
/// We are currently fetching a collation. | ||
Fetching, | ||
/// We are waiting that a collation is being validated. | ||
WaitingOnValidation, | ||
/// We have seconded a collation. | ||
Seconded, | ||
/// We are currently fetching a collation for the specified `ParaId`. | ||
Fetching(ParaId), | ||
/// We are waiting that a collation is being validated for the specified `ParaId`. | ||
WaitingOnValidation(ParaId), | ||
} | ||
|
||
impl Default for CollationStatus { | ||
|
@@ -199,23 +204,16 @@ impl Default for CollationStatus { | |
} | ||
} | ||
|
||
impl CollationStatus { | ||
/// Downgrades to `Waiting`, but only if `self != Seconded`. | ||
fn back_to_waiting(&mut self, relay_parent_mode: ProspectiveParachainsMode) { | ||
match self { | ||
Self::Seconded => | ||
if relay_parent_mode.is_enabled() { | ||
// With async backing enabled it's allowed to | ||
// second more candidates. | ||
*self = Self::Waiting | ||
}, | ||
_ => *self = Self::Waiting, | ||
} | ||
} | ||
/// The number of claims in the claim queue and seconded candidates count for a specific `ParaId`. | ||
#[derive(Default, Debug)] | ||
struct CandidatesStatePerPara { | ||
/// How many collations have been seconded. | ||
pub seconded_per_para: usize, | ||
// Claims in the claim queue for the `ParaId`. | ||
pub claims_per_para: usize, | ||
} | ||
|
||
/// Information about collations per relay parent. | ||
#[derive(Default)] | ||
pub struct Collations { | ||
/// What is the current status in regards to a collation for this relay parent? | ||
pub status: CollationStatus, | ||
|
@@ -224,75 +222,116 @@ pub struct Collations { | |
/// This is the currently last started fetch, which did not exceed `MAX_UNSHARED_DOWNLOAD_TIME` | ||
/// yet. | ||
pub fetching_from: Option<(CollatorId, Option<CandidateHash>)>, | ||
/// Collation that were advertised to us, but we did not yet fetch. | ||
pub waiting_queue: VecDeque<(PendingCollation, CollatorId)>, | ||
/// How many collations have been seconded. | ||
pub seconded_count: usize, | ||
/// Collation that were advertised to us, but we did not yet request or fetch. Grouped by | ||
/// `ParaId`. | ||
waiting_queue: BTreeMap<ParaId, VecDeque<(PendingCollation, CollatorId)>>, | ||
/// Number of seconded candidates and claims in the claim queue per `ParaId`. | ||
candidates_state: BTreeMap<ParaId, CandidatesStatePerPara>, | ||
} | ||
|
||
impl Collations { | ||
pub(super) fn new(group_assignments: &Vec<ParaId>) -> Self { | ||
let mut candidates_state = BTreeMap::<ParaId, CandidatesStatePerPara>::new(); | ||
|
||
for para_id in group_assignments { | ||
candidates_state.entry(*para_id).or_default().claims_per_para += 1; | ||
} | ||
|
||
Self { | ||
status: Default::default(), | ||
fetching_from: None, | ||
waiting_queue: Default::default(), | ||
candidates_state, | ||
} | ||
} | ||
|
||
/// Note a seconded collation for a given para. | ||
pub(super) fn note_seconded(&mut self) { | ||
self.seconded_count += 1 | ||
pub(super) fn note_seconded(&mut self, para_id: ParaId) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we also note when we get an invalid collation and make sure we can second something else at that slot? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We count the pending fetch/second via |
||
self.candidates_state.entry(para_id).or_default().seconded_per_para += 1; | ||
gum::trace!(target: LOG_TARGET, ?para_id, new_count=self.candidates_state.entry(para_id).or_default().seconded_per_para, "Note seconded."); | ||
} | ||
|
||
/// Returns the next collation to fetch from the `waiting_queue`. | ||
/// Adds a new collation to the waiting queue for the relay parent. This function doesn't | ||
/// perform any limits check. The caller should assure that the collation limit is respected. | ||
pub(super) fn add_to_waiting_queue(&mut self, collation: (PendingCollation, CollatorId)) { | ||
self.waiting_queue.entry(collation.0.para_id).or_default().push_back(collation); | ||
} | ||
|
||
/// Picks a collation to fetch from the waiting queue. | ||
/// When fetching collations we need to ensure that each parachain has got a fair core time | ||
/// share depending on its assignments in the claim queue. This means that the number of | ||
/// collations seconded per parachain should ideally be equal to the number of claims for the | ||
/// particular parachain in the claim queue. | ||
/// | ||
/// This will reset the status back to `Waiting` using [`CollationStatus::back_to_waiting`]. | ||
/// To achieve this each seconded collation is mapped to an entry from the claim queue. The next | ||
/// fetch is the first unfulfilled entry from the claim queue for which there is an | ||
/// advertisement. | ||
/// | ||
/// Returns `Some(_)` if there is any collation to fetch, the `status` is not `Seconded` and | ||
/// the passed in `finished_one` is the currently `waiting_collation`. | ||
pub(super) fn get_next_collation_to_fetch( | ||
/// `unfulfilled_claim_queue_entries` represents all claim queue entries which are still not | ||
/// fulfilled. | ||
pub(super) fn pick_a_collation_to_fetch( | ||
&mut self, | ||
finished_one: &(CollatorId, Option<CandidateHash>), | ||
relay_parent_mode: ProspectiveParachainsMode, | ||
unfulfilled_claim_queue_entries: Vec<ParaId>, | ||
) -> Option<(PendingCollation, CollatorId)> { | ||
// If finished one does not match waiting_collation, then we already dequeued another fetch | ||
// to replace it. | ||
if let Some((collator_id, maybe_candidate_hash)) = self.fetching_from.as_ref() { | ||
// If a candidate hash was saved previously, `finished_one` must include this too. | ||
if collator_id != &finished_one.0 && | ||
maybe_candidate_hash.map_or(true, |hash| Some(&hash) != finished_one.1.as_ref()) | ||
gum::trace!( | ||
target: LOG_TARGET, | ||
waiting_queue=?self.waiting_queue, | ||
candidates_state=?self.candidates_state, | ||
"Pick a collation to fetch." | ||
); | ||
|
||
for assignment in unfulfilled_claim_queue_entries { | ||
// if there is an unfulfilled assignment - return it | ||
if let Some(collation) = self | ||
.waiting_queue | ||
.get_mut(&assignment) | ||
.and_then(|collations| collations.pop_front()) | ||
{ | ||
gum::trace!( | ||
target: LOG_TARGET, | ||
waiting_collation = ?self.fetching_from, | ||
?finished_one, | ||
"Not proceeding to the next collation - has already been done." | ||
); | ||
return None | ||
return Some(collation) | ||
} | ||
} | ||
self.status.back_to_waiting(relay_parent_mode); | ||
|
||
None | ||
} | ||
|
||
// Returns `true` if there is a pending collation for the specified `ParaId`. | ||
fn is_pending_for_para(&self, para_id: &ParaId) -> bool { | ||
match self.status { | ||
// We don't need to fetch any other collation when we already have seconded one. | ||
CollationStatus::Seconded => None, | ||
CollationStatus::Waiting => | ||
if self.is_seconded_limit_reached(relay_parent_mode) { | ||
None | ||
} else { | ||
self.waiting_queue.pop_front() | ||
}, | ||
CollationStatus::WaitingOnValidation | CollationStatus::Fetching => | ||
unreachable!("We have reset the status above!"), | ||
CollationStatus::Fetching(pending_para_id) if pending_para_id == *para_id => true, | ||
CollationStatus::WaitingOnValidation(pending_para_id) | ||
if pending_para_id == *para_id => | ||
true, | ||
_ => false, | ||
} | ||
} | ||
|
||
/// Checks the limit of seconded candidates. | ||
pub(super) fn is_seconded_limit_reached( | ||
&self, | ||
relay_parent_mode: ProspectiveParachainsMode, | ||
) -> bool { | ||
let seconded_limit = | ||
if let ProspectiveParachainsMode::Enabled { max_candidate_depth, .. } = | ||
relay_parent_mode | ||
{ | ||
max_candidate_depth + 1 | ||
} else { | ||
1 | ||
}; | ||
self.seconded_count >= seconded_limit | ||
// Returns the number of seconded and likely soon to be seconded collations for the specified | ||
// `ParaId`. | ||
pub(super) fn seconded_and_pending_for_para(&self, para_id: &ParaId) -> usize { | ||
let seconded_for_para = self | ||
.candidates_state | ||
.get(¶_id) | ||
.map(|state| state.seconded_per_para) | ||
.unwrap_or_default(); | ||
let pending_for_para = self.is_pending_for_para(para_id) as usize; | ||
|
||
gum::trace!( | ||
target: LOG_TARGET, | ||
?para_id, | ||
seconded_for_para, | ||
pending_for_para, | ||
"Seconded and pending for para." | ||
); | ||
|
||
seconded_for_para + pending_for_para | ||
} | ||
|
||
// Returns the number of claims in the claim queue for the specified `ParaId`. | ||
pub(super) fn claims_for_para(&self, para_id: &ParaId) -> usize { | ||
self.candidates_state | ||
.get(para_id) | ||
.map(|state| state.claims_per_para) | ||
.unwrap_or_default() | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally and before async backing once we've reached Seconded status we knew we are done here. Right now we can only be satisfied with collations once we reach the seconding limit but I see that we do not have a corresponding status here.
When we reach the seconding limit and no more ocllations are expected what will be the status here?
Or is it nonsensical to even refer to a CollationStatus when we reach the limit? (Dont think so)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could do it. The lifetime right now is Waiting -> Fetching -> WaitingOnValidation and back to Waiting. We can have an additional
Satisfied
or something similar which indicates that we can't accept any more collations at this relay parent.Then on each seconded collation we can call
seconded_and_pending_for_para_in_view
and if we have reached the claim queue limit we can set the state toSatisfied
and know that nothing can be accepted at this relay parent.The benefits are that once the relay parent 'is complete' (all claim queue entries are filled) we won't need to run any checks in order to reject an incoming collation.
The drawbacks are that on each seconded collation we'll have to run the same function. So unless a collator is spamming us the benefit is minimal. And since we are limiting the number of advertisements per collator we can't get spammed too much.
We can explore this further but it better be a follow up task.