Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Allow to benchmark HRMP messages from the undying collator #6261

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rework collator functor generator
  • Loading branch information
vstakhov committed Mar 28, 2023
commit e1803f59b137e3649abf4275185964ad58d19d54
129 changes: 73 additions & 56 deletions parachain/test-parachains/undying/collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use polkadot_node_primitives::{
};
use polkadot_primitives::{CollatorId, CollatorPair, Hash, OutboundHrmpMessage};
use sp_core::Pair;
use std::collections::BTreeMap;
use std::{
collections::HashMap,
sync::{
Expand Down Expand Up @@ -193,11 +194,12 @@ pub struct Collator {
state: Arc<Mutex<State>>,
key: CollatorPair,
seconded_collations: Arc<AtomicU32>,
para_id: ParaId,
}

impl Default for Collator {
fn default() -> Self {
Self::new(DEFAULT_POV_SIZE, DEFAULT_PVF_COMPLEXITY, Vec::new())
Self::new(DEFAULT_POV_SIZE, DEFAULT_PVF_COMPLEXITY, Vec::new(), 0)
}
}

Expand All @@ -208,6 +210,7 @@ impl Collator {
pov_size: usize,
pvf_complexity: u32,
hrmp_channels: Vec<HrmpChannelConfiguration>,
para_id: u32,
) -> Self {
let graveyard_size = ((pov_size / std::mem::size_of::<u8>()) as f64).sqrt().ceil() as usize;

Expand All @@ -228,6 +231,7 @@ impl Collator {
))),
key: CollatorPair::generate().0,
seconded_collations: Arc::new(AtomicU32::new(0)),
para_id: para_id.into(),
}
}

Expand Down Expand Up @@ -269,66 +273,79 @@ impl Collator {

let state = self.state.clone();
let seconded_collations = self.seconded_collations.clone();
let para_id = self.para_id;

Box::new(move |relay_parent, validation_data| {
let relay_chain_interface = relay_chain_interface.clone();
let state = state.clone();
let seconded_collations = seconded_collations.clone();
let spawner = spawner.clone();
let relay_parent_number = validation_data.relay_parent_number;
let parent = HeadData::decode(&mut &validation_data.parent_head.0[..])
.expect("Decodes parent head");

let (block_data, head_data, messages) = state.lock().unwrap().advance(parent);

log::info!(
"created a new collation on relay-parent({}): {:?}",
relay_parent,
head_data,
);

// The pov is the actually the initial state and the transactions.
let pov = PoV { block_data: block_data.encode().into() };

let collation = Collation {
upward_messages: Default::default(),
horizontal_messages: messages,
new_validation_code: None,
head_data: head_data.encode().into(),
proof_of_validity: MaybeCompressedPoV::Raw(pov.clone()),
processed_downward_messages: 0,
hrmp_watermark: validation_data.relay_parent_number,
};

log::info!("Raw PoV size for collation: {} bytes", pov.block_data.0.len(),);
let compressed_pov = maybe_compress_pov(pov);

log::info!(
"Compressed PoV size for collation: {} bytes",
compressed_pov.block_data.0.len(),
);

let (result_sender, recv) = oneshot::channel::<CollationSecondedSignal>();
let seconded_collations = seconded_collations.clone();
spawner.spawn(
"undying-collator-seconded",
None,
async move {
if let Ok(res) = recv.await {
if !matches!(
res.statement.payload(),
Statement::Seconded(s) if s.descriptor.pov_hash == compressed_pov.hash(),
) {
log::error!(
"Seconded statement should match our collation: {:?}",
res.statement.payload()
);
std::process::exit(-1);
async move {
let inbound_messages = relay_chain_interface
.retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent)
.await
.unwrap_or_else(|_| BTreeMap::new())
.values();

let (block_data, head_data, messages) = state.lock().unwrap().advance(parent);

log::info!(
"created a new collation on relay-parent({}): {:?}",
relay_parent,
head_data,
);

// The pov is the actually the initial state and the transactions.
let pov = PoV { block_data: block_data.encode().into() };

let collation = Collation {
upward_messages: Default::default(),
horizontal_messages: messages,
new_validation_code: None,
head_data: head_data.encode().into(),
proof_of_validity: MaybeCompressedPoV::Raw(pov.clone()),
processed_downward_messages: 0,
hrmp_watermark: relay_parent_number,
};

log::info!("Raw PoV size for collation: {} bytes", pov.block_data.0.len(),);
let compressed_pov = maybe_compress_pov(pov);

log::info!(
"Compressed PoV size for collation: {} bytes",
compressed_pov.block_data.0.len(),
);

let (result_sender, recv) = oneshot::channel::<CollationSecondedSignal>();
let seconded_collations = seconded_collations.clone();
spawner.spawn(
"undying-collator-seconded",
None,
async move {
if let Ok(res) = recv.await {
if !matches!(
res.statement.payload(),
Statement::Seconded(s) if s.descriptor.pov_hash == compressed_pov.hash(),
) {
log::error!(
"Seconded statement should match our collation: {:?}",
res.statement.payload()
);
std::process::exit(-1);
}

seconded_collations.fetch_add(1, Ordering::Relaxed);
}

seconded_collations.fetch_add(1, Ordering::Relaxed);
}
}
.boxed(),
);

async move { Some(CollationResult { collation, result_sender: Some(result_sender) }) }
.boxed()
.boxed(),
);
Some(CollationResult { collation, result_sender: Some(result_sender) })
}
.boxed()
})
}

Expand All @@ -341,7 +358,7 @@ impl Collator {
let current_block = self.state.lock().unwrap().best_block;

if start_block + blocks <= current_block {
return
return;
}
}
}
Expand All @@ -356,7 +373,7 @@ impl Collator {
Delay::new(Duration::from_secs(1)).await;

if seconded <= seconded_collations.load(Ordering::Relaxed) {
return
return;
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions parachain/test-parachains/undying/collator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ fn main() -> Result<()> {
Some(cli::Subcommand::ExportGenesisState(params)) => {
// `pov_size` and `pvf_complexity` need to match the ones that we start the collator
// with.
let collator =
Collator::new(params.pov_size, params.pvf_complexity, params.hrmp_params);
let collator = Collator::new(
params.pov_size,
params.pvf_complexity,
params.hrmp_params,
params.parachain_id,
);
println!("0x{:?}", HexDisplay::from(&collator.genesis_head()));

Ok::<_, Error>(())
Expand All @@ -56,8 +60,12 @@ fn main() -> Result<()> {
})?;

runner.run_node_until_exit(|config| async move {
let collator =
Collator::new(cli.run.pov_size, cli.run.pvf_complexity, cli.run.hrmp_params);
let collator = Collator::new(
cli.run.pov_size,
cli.run.pvf_complexity,
cli.run.hrmp_params,
cli.run.parachain_id,
);

let full_node = polkadot_service::build_full(
config,
Expand Down