Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Batch vote import in dispute-distribution #5894

Merged
merged 45 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b202cc8
Start work on batching in dispute-distribution.
eskimor Aug 16, 2022
192d73b
Guide work.
eskimor Aug 17, 2022
a2f8fde
More guide changes. Still very much WIP.
eskimor Aug 18, 2022
5e2f4a5
Finish guide changes.
eskimor Aug 22, 2022
585ec6d
Clarification
eskimor Aug 22, 2022
7e02910
Adjust argument about slashing.
eskimor Aug 23, 2022
4e170e2
WIP: Add constants to receiver.
eskimor Aug 23, 2022
362ebae
Maintain order of disputes.
eskimor Aug 24, 2022
68c7073
dispute-distribuion sender Rate limit.
eskimor Aug 24, 2022
56519a7
Cleanup
eskimor Aug 25, 2022
b3ab280
WIP: dispute-distribution receiver.
eskimor Aug 25, 2022
4b4df00
WIP: Batching.
eskimor Aug 25, 2022
5816c8d
fmt
eskimor Aug 25, 2022
a821efc
Update `PeerQueues` to maintain more invariants.
eskimor Aug 26, 2022
3b71817
WIP: Batching.
eskimor Aug 26, 2022
909569b
Small cleanup
eskimor Aug 26, 2022
fa14c43
Batching logic.
eskimor Aug 30, 2022
89d622d
Some integration work.
eskimor Aug 30, 2022
401ffb8
Finish.
eskimor Aug 31, 2022
3dd2041
Typo.
eskimor Sep 1, 2022
f9fc6c8
Docs.
eskimor Sep 1, 2022
43d946d
Report missing metric.
eskimor Sep 1, 2022
3e5ceaa
Doc pass.
eskimor Sep 1, 2022
0e11f4f
Tests for waiting_queue.
eskimor Sep 2, 2022
2fcebad
Speed up some crypto by 10x.
eskimor Sep 2, 2022
3d444ae
Fix redundant import.
eskimor Sep 2, 2022
e7923a7
Add some tracing.
eskimor Sep 2, 2022
8b8f722
Better sender rate limit
eskimor Sep 2, 2022
a79a96d
Some tests.
eskimor Sep 2, 2022
c1ed615
Tests
eskimor Sep 5, 2022
f4c530c
Add logging to rate limiter
eskimor Sep 5, 2022
795c6c1
Update roadmap/implementers-guide/src/node/disputes/dispute-distribut…
eskimor Sep 8, 2022
d46be76
Update roadmap/implementers-guide/src/node/disputes/dispute-distribut…
eskimor Sep 8, 2022
9b8787d
Update node/network/dispute-distribution/src/receiver/mod.rs
eskimor Sep 8, 2022
9795098
Review feedback.
eskimor Sep 9, 2022
465f266
Also log peer in log messages.
eskimor Sep 21, 2022
88c61d2
Fix indentation.
eskimor Sep 28, 2022
78066ac
waker -> timer
eskimor Sep 28, 2022
3e43b0a
Guide improvement.
eskimor Sep 28, 2022
f0e5001
Remove obsolete comment.
eskimor Sep 28, 2022
30d10f4
waker -> timer
eskimor Oct 4, 2022
8ef4a24
Fix spell complaints.
eskimor Oct 4, 2022
5b5d82d
Merge branch 'master' into rk-batch-vote-import
eskimor Oct 4, 2022
f0a054b
Merge branch 'master' into rk-batch-vote-import
eskimor Oct 4, 2022
35d698c
Fix Cargo.lock
eskimor Oct 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions node/network/dispute-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"

[dependencies]
futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
derive_more = "0.99.17"
parity-scale-codec = { version = "3.1.5", features = ["std"] }
Expand All @@ -21,6 +22,7 @@ sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "maste
thiserror = "1.0.31"
fatality = "0.0.6"
lru = "0.7.7"
indexmap = "1.9.1"

[dev-dependencies]
async-trait = "0.1.53"
Expand Down
2 changes: 2 additions & 0 deletions node/network/dispute-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub type Result<T> = std::result::Result<T, Error>;

pub type FatalResult<T> = std::result::Result<T, FatalError>;

pub type JfyiResult<T> = std::result::Result<T, JfyiError>;

/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
Expand Down
29 changes: 27 additions & 2 deletions node/network/dispute-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
//! The sender is responsible for getting our vote out, see [`sender`]. The receiver handles
//! incoming [`DisputeRequest`]s and offers spam protection, see [`receiver`].

use std::time::Duration;

use futures::{channel::mpsc, FutureExt, StreamExt, TryFutureExt};

use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery;
Expand Down Expand Up @@ -57,6 +59,8 @@ use self::sender::{DisputeSender, TaskFinish};

/// ## The receiver [`DisputesReceiver`]
///
/// TODO: Obsolete:
///
/// The receiving side is implemented as `DisputesReceiver` and is run as a separate long running task within
/// this subsystem ([`DisputesReceiver::run`]).
///
Expand Down Expand Up @@ -93,6 +97,20 @@ pub use metrics::Metrics;

const LOG_TARGET: &'static str = "parachain::dispute-distribution";

/// Rate limit on the `receiver` side.
///
/// If messages from one peer come in at a higher rate than every `RECEIVE_RATE_LIMIT` on average, we
/// start dropping messages from that peer to enforce that limit.
pub const RECEIVE_RATE_LIMIT: Duration = Duration::from_millis(100);

/// Rate limit on the `sender` side.
///
/// In order to not hit the `RECEIVE_RATE_LIMIT` on the receiving side, we limit out sending rate as
/// well.
///
/// We add 50ms extra, just to have some save margin to the `RECEIVE_RATE_LIMIT`.
pub const SEND_RATE_LIMIT: Duration = RECEIVE_RATE_LIMIT.saturating_add(Duration::from_millis(50));

/// The dispute distribution subsystem.
pub struct DisputeDistributionSubsystem<AD> {
/// Easy and efficient runtime access for this subsystem.
Expand Down Expand Up @@ -172,6 +190,12 @@ where
ctx.spawn("disputes-receiver", receiver.run().boxed())
.map_err(FatalError::SpawnTask)?;

// Process messages for sending side.
//
// Note: We want the sender to be rate limited and we are currently taking advantage of the
// fact that the root task of this subsystem is only concerned with sending: Functions of
// `DisputeSender` might back pressure if the rate limit is hit, which will slow down this
// loop. If this fact ever changes, we will likely need another task.
loop {
let message = MuxedMessage::receive(&mut ctx, &mut self.sender_rx).await;
match message {
Expand Down Expand Up @@ -247,9 +271,10 @@ impl MuxedMessage {
// ends.
let from_overseer = ctx.recv().fuse();
futures::pin_mut!(from_overseer, from_sender);
futures::select!(
msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
// We select biased to make sure we finish up loose ends, before starting new work.
futures::select_biased!(
eskimor marked this conversation as resolved.
Show resolved Hide resolved
msg = from_sender.next() => MuxedMessage::Sender(msg),
msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
)
}
}
Expand Down
202 changes: 202 additions & 0 deletions node/network/dispute-distribution/src/receiver/batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{
cmp::{Ord, Ordering},
collections::{hash_map, BTreeMap, BinaryHeap, HashMap, HashSet},
time::Instant,
};

use futures_timer::Delay;
use polkadot_node_network_protocol::request_response::{
incoming::OutgoingResponseSender, v1::DisputeRequest,
};
use polkadot_node_primitives::SignedDisputeStatement;
use polkadot_primitives::v2::{CandidateHash, CandidateReceipt, ValidatorIndex};

use super::BATCH_COLLECTING_INTERVAL;

/// TODO: Limit number of batches

// - Batches can be added very rate limit timeout.
// - They have to be checked every BATCH_COLLECTING_INTERVAL.
// - We can get the earliest next wakeup - keep ordered list of wakeups! Then we always know when
// the next one comes - only needs to get updated on insert. - Tada!
struct Batches {
batches: HashMap<CandidateHash, Batch>,
check_waker: Option<Delay>,
pending_wakes: BinaryHeap<PendingWake>,
}

/// Represents some batch waiting for its next tick to happen at `next_tick`.
///
/// This is an internal type meant to be used in the `pending_wakes` `BinaryHeap` field of
/// `Batches`. It provides an `Ord` instance, that sorts descending with regard to `Instant` (so we
/// get a `min-heap` with the earliest `Instant` at the top.
#[derive(Eq, PartialEq)]
struct PendingWake {
candidate_hash: CandidateHash,
next_tick: Instant,
}

/// A found batch is either really found or got created so it can be found.
enum FoundBatch<'a> {
/// Batch just got created.
Created(&'a mut Batch),
/// Batch already existed.
Found(&'a mut Batch),
}

impl Batches {
/// Create new empty `Batches`.
pub fn new() -> Self {
Self { batches: HashMap::new(), check_waker: None, pending_wakes: BinaryHeap::new() }
}

/// Find a particular batch.
///
/// That is either find it, or we create it as reflected by the result `FoundBatch`.
pub fn find_batch(
&mut self,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
) -> FoundBatch {
debug_assert!(candidate_hash == candidate_receipt.hash());
match self.batches.entry(candidate_hash) {
hash_map::Entry::Vacant(vacant) =>
FoundBatch::Created(vacant.insert(Batch::new(candidate_receipt))),
hash_map::Entry::Occupied(occupied) => FoundBatch::Found(occupied.get_mut()),
}
}
// Next steps:
//
// - Make sure binary heap above stays current.
// - Use head of binary heap to schedule next wakeup.
// - Provide funtion that provides imports delayed by the wakeup future (similar to rate
// limiting).
// - Important: Direct updating of last_tick of `Batch` has to be forbidden as this would break
// our binary heap. Instead all updates have to go through `Batches`
}

/// A batch of votes to be imported into the `dispute-coordinator`.
///
/// Vote imports are way more efficient when performed in batches, hence we batch together incoming
/// votes until the rate of incoming votes falls below a threshold, then we import into the dispute
/// coordinator.
///
/// A `Batch` keeps track of the votes to be imported and the current incoming rate, on rate update
/// it will "flush" in case the incoming rate dropped too low, preparing the import.
pub struct Batch {
/// The actual candidate this batch is concerned with.
candidate_receipt: CandidateReceipt,

/// Cache `CandidateHash` to do efficient sanity checks.
candidate_hash: CandidateHash,

/// All valid votes received in this batch so far.
///
/// We differentiate between valid and invalid votes, so we can detect (and drop) duplicates,
/// while still allowing validators to equivocate.
///
/// Detecting and rejecting duplicats is crucial in order to effectively enforce
/// `MIN_KEEP_BATCH_ALIVE_VOTES` per `BATCH_COLLECTING_INTERVAL`. If we would count duplicates
/// here, the mechanism would be broken.
valid_votes: HashMap<ValidatorIndex, SignedDisputeStatement>,

/// All invalid votes received in this batch so far.
invalid_votes: HashMap<ValidatorIndex, SignedDisputeStatement>,

/// How many votes have been batched in the last `BATCH_COLLECTING_INTERVAL`?
votes_batched_since_last_tick: u32,

/// Timestamp of creation or last time we checked incoming rate.
last_tick: Instant,

/// Requesters waiting for a response.
pending_responses: Vec<OutgoingResponseSender<DisputeRequest>>,
}

impl Batch {
/// Create a new empty batch based on the given `CandidateReceipt`.
///
/// To create a `Batch` use Batches::find_batch`.
fn new(candidate_receipt: CandidateReceipt) -> Self {
Self {
candidate_hash: candidate_receipt.hash(),
candidate_receipt,
valid_votes: HashMap::new(),
invalid_votes: HashMap::new(),
votes_batched_since_last_tick: 0,
last_tick: Instant::now(),
pending_responses: Vec::new(),
}
}

/// Add votes from a validator into the batch.
///
/// The statements are supposed to be the valid and invalid statements received in a
/// `DisputeRequest`.
///
/// The given `pending_response` is the corresponding response sender. If at least one of the
/// votes is new as far as this batch is concerned we record the pending_response, for later
/// use. In case both votes are known already, we return the response sender as an `Err` value.
pub fn add_votes(
&mut self,
valid_vote: (SignedDisputeStatement, ValidatorIndex),
invalid_vote: (SignedDisputeStatement, ValidatorIndex),
pending_response: OutgoingResponseSender<DisputeRequest>,
) -> Result<(), OutgoingResponseSender<DisputeRequest>> {
debug_assert!(valid_vote.0.candidate_hash() == invalid_vote.0.candidate_hash());
debug_assert!(valid_vote.0.candidate_hash() == &self.candidate_hash);

let mut duplicate = true;

if self.valid_votes.insert(valid_vote.1, valid_vote.0).is_none() {
self.votes_batched_since_last_tick += 1;
duplicate = false;
}
if self.invalid_votes.insert(invalid_vote.1, invalid_vote.0).is_none() {
self.votes_batched_since_last_tick += 1;
duplicate = false;
}

if duplicate {
Err(pending_response)
} else {
Ok(())
}
}

/// When the next "tick" is supposed to happen.
fn time_next_tick(&self) -> Instant {
self.last_tick + BATCH_COLLECTING_INTERVAL
}
}

impl PartialOrd<PendingWake> for PendingWake {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PendingWake {
fn cmp(&self, other: &Self) -> Ordering {
// Reverse order for min-heap:
match other.next_tick.cmp(&self.next_tick) {
Ordering::Equal => other.candidate_hash.cmp(&self.candidate_hash),
o => o,
}
}
}
9 changes: 8 additions & 1 deletion node/network/dispute-distribution/src/receiver/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use fatality::Nested;

use polkadot_node_network_protocol::{request_response::incoming, PeerId};
use polkadot_node_subsystem_util::runtime;
use polkadot_primitives::v2::AuthorityDiscoveryId;

use crate::LOG_TARGET;

Expand All @@ -44,16 +45,22 @@ pub enum Error {
#[error("Dispute request with invalid signatures, from peer {0}.")]
InvalidSignature(PeerId),

#[error("Received votes from peer {0} have been completely redundant.")]
RedundantMessage(PeerId),

#[error("Import of dispute got canceled for peer {0} - import failed for some reason.")]
ImportCanceled(PeerId),

#[error("Peer {0} attempted to participate in dispute and is not a validator.")]
NotAValidator(PeerId),

#[error("Authority {0} sent messages at a too high rate.")]
AuthorityFlooding(AuthorityDiscoveryId),
}

pub type Result<T> = std::result::Result<T, Error>;

pub type JfyiErrorResult<T> = std::result::Result<T, JfyiError>;
pub type JfyiResult<T> = std::result::Result<T, JfyiError>;

/// Utility for eating top level errors and log them.
///
Expand Down
Loading