Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Ensure that we fetch another collation if the first collation was inv…
Browse files Browse the repository at this point in the history
…alid (#3362)

* Ensure that we fetch another collation if the first collation was invalid

* Feedback
  • Loading branch information
bkchr authored Jun 28, 2021
1 parent f17bc16 commit 2b855f3
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 52 deletions.
99 changes: 71 additions & 28 deletions node/network/collator-protocol/src/validator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,12 +480,14 @@ type PendingCollationFetch = (
);

/// The status of the collations in [`CollationsPerRelayParent`].
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
enum CollationStatus {
/// We are waiting for a collation to be advertised to us.
Waiting,
/// We are currently fetching a collation.
Fetching,
/// We are waiting that a collation is being validated.
WaitingOnValidation,
/// We have seconded a collation.
Seconded,
}
Expand All @@ -496,6 +498,16 @@ impl Default for CollationStatus {
}
}

impl CollationStatus {
/// Downgrades to `Waiting`, but only if `self != Seconded`.
fn back_to_waiting(&mut self) {
match self {
Self::Seconded => {},
_ => *self = Self::Waiting,
}
}
}

/// Information about collations per relay parent.
#[derive(Default)]
struct CollationsPerRelayParent {
Expand All @@ -505,6 +517,25 @@ struct CollationsPerRelayParent {
unfetched_collations: Vec<(PendingCollation, CollatorId)>,
}

impl CollationsPerRelayParent {
/// Returns the next collation to fetch from the `unfetched_collations`.
///
/// This will reset the status back to `Waiting` using [`CollationStatus::back_to_waiting`].
///
/// Returns `Some(_)` if there is any collation to fetch and the `status` is not `Seconded`.
pub fn get_next_collation_to_fetch(&mut self) -> Option<(PendingCollation, CollatorId)> {
self.status.back_to_waiting();

match self.status {
// We don't need to fetch any other collation when we already have seconded one.
CollationStatus::Seconded => None,
CollationStatus::Waiting => self.unfetched_collations.pop(),
CollationStatus::WaitingOnValidation | CollationStatus::Fetching =>
unreachable!("We have reset the status above!"),
}
}
}

/// All state relevant for the validator side of the protocol lives here.
#[derive(Default)]
struct State {
Expand Down Expand Up @@ -812,7 +843,7 @@ where
let collations = state.collations_per_relay_parent.entry(relay_parent).or_default();

match collations.status {
CollationStatus::Fetching =>
CollationStatus::Fetching | CollationStatus::WaitingOnValidation =>
collations.unfetched_collations.push((pending_collation, id)),
CollationStatus::Waiting => {
collations.status = CollationStatus::Fetching;
Expand Down Expand Up @@ -1024,14 +1055,28 @@ where
}
}
Invalid(parent, candidate_receipt) => {
if state.pending_candidates
.get(&parent)
.map(|e| e.1.commitments_hash == Some(candidate_receipt.commitments_hash))
.unwrap_or_default()
{
if let Some((id, _)) = state.pending_candidates.remove(&parent) {
report_collator(ctx, &state.peer_data, id).await;
let id = match state.pending_candidates.entry(parent) {
Entry::Occupied(entry)
if entry.get().1.commitments_hash == Some(candidate_receipt.commitments_hash) => entry.remove().0,
Entry::Occupied(_) => {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?parent,
candidate = ?candidate_receipt.hash(),
"Reported invalid candidate for unknown `pending_candidate`!",
);
return
}
Entry::Vacant(_) => return,
};

report_collator(ctx, &state.peer_data, id).await;

if let Some((next, id)) = state.collations_per_relay_parent
.get_mut(&parent)
.and_then(|c| c.get_next_collation_to_fetch())
{
fetch_collation(ctx, state, next, id).await;
}
}
}
Expand Down Expand Up @@ -1139,30 +1184,21 @@ async fn handle_collation_fetched_result(
"Failed to fetch collation.",
);

let (next_try, id) = if let Some(collations) = state.collations_per_relay_parent.get_mut(&relay_parent) {
if let Some(next_try) = collations.unfetched_collations.pop() {
next_try
} else if matches!(collations.status, CollationStatus::Fetching) {
collations.status = CollationStatus::Waiting;
return
} else {
tracing::error!(
target: LOG_TARGET,
status = ?collations.status,
"Expected status `CollationStatus::Fetching` but got unexpected status."
);
return
}
} else {
return
};

fetch_collation(ctx, state, next_try, id).await;
if let Some((next, id)) = state.collations_per_relay_parent
.get_mut(&relay_parent)
.and_then(|c| c.get_next_collation_to_fetch())
{
fetch_collation(ctx, state, next, id).await;
}

return
},
};

if let Some(collations) = state.collations_per_relay_parent.get_mut(&relay_parent) {
collations.status = CollationStatus::WaitingOnValidation;
}

if let Entry::Vacant(entry) = state.pending_candidates.entry(relay_parent) {
collation_event.1.commitments_hash = Some(candidate_receipt.commitments_hash);
ctx.send_message(
Expand All @@ -1174,6 +1210,13 @@ async fn handle_collation_fetched_result(
).await;

entry.insert(collation_event);
} else {
tracing::error!(
target: LOG_TARGET,
?relay_parent,
candidate = ?candidate_receipt.hash(),
"Trying to insert a pending candidate failed, because there is already one!",
)
}
}

Expand Down
119 changes: 95 additions & 24 deletions node/network/collator-protocol/src/validator_side/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use polkadot_node_network_protocol::{
our_view, ObservedRole, request_response::{Requests, ResponseSender},
};

const ACTIVITY_TIMEOUT: Duration = Duration::from_millis(50);
const ACTIVITY_TIMEOUT: Duration = Duration::from_millis(500);
const DECLARE_TIMEOUT: Duration = Duration::from_millis(25);

#[derive(Clone)]
Expand Down Expand Up @@ -262,15 +262,16 @@ async fn assert_candidate_backing_second(
expected_relay_parent: Hash,
expected_para_id: ParaId,
expected_pov: &PoV,
) {
) -> CandidateReceipt {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::CandidateBacking(CandidateBackingMessage::Second(relay_parent, candidate_receipt, incoming_pov)
) => {
assert_eq!(expected_relay_parent, relay_parent);
assert_eq!(expected_para_id, candidate_receipt.descriptor.para_id);
assert_eq!(*expected_pov, incoming_pov);
});
candidate_receipt
})
}

/// Assert that a collator got disconnected.
Expand All @@ -290,25 +291,6 @@ async fn assert_collator_disconnect(
);
}

/// Assert that the given collators got disconnected.
async fn assert_collators_disconnect(
virtual_overseer: &mut VirtualOverseer,
expected_peers: &[PeerId],
) {
for _ in expected_peers {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer(
peer,
peer_set,
)) => {
assert!(expected_peers.contains(&peer), "Unexpected collator disconnected: {:?}", peer);
assert_eq!(PeerSet::Collation, peer_set);
}
);
}
}

/// Assert that a fetch collation request was send.
async fn assert_fetch_collation_request(
virtual_overseer: &mut VirtualOverseer,
Expand Down Expand Up @@ -603,8 +585,6 @@ fn fetch_collations_works() {
&pov,
).await;

assert_collators_disconnect(&mut virtual_overseer, &[peer_b.clone(), peer_c.clone()]).await;

overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerDisconnected(peer_b.clone())),
Expand Down Expand Up @@ -681,6 +661,97 @@ fn fetch_collations_works() {
});
}

// Ensure that we fetch a second collation, after the first checked collation was found to be invalid.
#[test]
fn fetch_next_collation_on_invalid_collation() {
let test_state = TestState::default();

test_harness(|test_harness| async move {
let TestHarness {
mut virtual_overseer,
} = test_harness;

let second = Hash::random();

overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent, second])
),
).await;

respond_to_core_info_queries(&mut virtual_overseer, &test_state).await;
respond_to_core_info_queries(&mut virtual_overseer, &test_state).await;

let peer_b = PeerId::random();
let peer_c = PeerId::random();

connect_and_declare_collator(
&mut virtual_overseer,
peer_b.clone(),
test_state.collators[0].clone(),
test_state.chain_ids[0].clone(),
).await;

connect_and_declare_collator(
&mut virtual_overseer,
peer_c.clone(),
test_state.collators[1].clone(),
test_state.chain_ids[0].clone(),
).await;

advertise_collation(&mut virtual_overseer, peer_b.clone(), test_state.relay_parent).await;
advertise_collation(&mut virtual_overseer, peer_c.clone(), test_state.relay_parent).await;

let response_channel = assert_fetch_collation_request(
&mut virtual_overseer,
test_state.relay_parent,
test_state.chain_ids[0],
).await;

let pov = PoV { block_data: BlockData(vec![]) };
let mut candidate_a = CandidateReceipt::default();
candidate_a.descriptor.para_id = test_state.chain_ids[0];
candidate_a.descriptor.relay_parent = test_state.relay_parent;
response_channel.send(Ok(
CollationFetchingResponse::Collation(
candidate_a.clone(),
pov.clone(),
).encode()
)).expect("Sending response should succeed");

let receipt = assert_candidate_backing_second(
&mut virtual_overseer,
test_state.relay_parent,
test_state.chain_ids[0],
&pov,
).await;

// Inform that the candidate was invalid.
overseer_send(&mut virtual_overseer, CollatorProtocolMessage::Invalid(test_state.relay_parent, receipt)).await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(
peer,
rep,
)) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_REPORT_BAD);
}
);

// We should see a request for another collation.
assert_fetch_collation_request(
&mut virtual_overseer,
test_state.relay_parent,
test_state.chain_ids[0],
).await;

virtual_overseer
});
}

#[test]
fn inactive_disconnected() {
let test_state = TestState::default();
Expand Down

0 comments on commit 2b855f3

Please sign in to comment.