Skip to content

Commit

Permalink
Run request-response consistently
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreiEres committed Oct 17, 2024
1 parent 5ecc6f2 commit c33f20f
Showing 1 changed file with 59 additions and 56 deletions.
115 changes: 59 additions & 56 deletions substrate/client/network/benches/request_response_protocol.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,37 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use sc_network::{
config::{
FullNetworkConfiguration, IncomingRequest, MultiaddrWithPeerId, NetworkConfiguration,
NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, Params, ProtocolId, Role,
FullNetworkConfiguration, IncomingRequest, NetworkConfiguration, NonDefaultSetConfig,
NonReservedPeerMode, NotificationHandshake, OutgoingResponse, Params, ProtocolId, Role,
SetConfig,
},
service::traits::NotificationEvent,
IfDisconnected, NetworkBackend, NetworkRequest, NetworkWorker, NotificationMetrics,
NotificationService, Roles, MAX_RESPONSE_SIZE,
};
use sc_network_common::sync::message::BlockAnnouncesHandshake;
use sc_network_sync::service::network::Network;
use sp_blockchain::HeaderBackend;
use sp_runtime::traits::Zero;
use std::{sync::Arc, time::Duration};
use substrate_test_runtime_client::{runtime, TestClientBuilder, TestClientBuilderExt};
use std::time::Duration;
use substrate_test_runtime_client::runtime;

pub fn dummy_block_announce_config() -> NonDefaultSetConfig {
let (block_announce_config, _notification_service) = NonDefaultSetConfig::new(
pub fn create_network_worker() -> (
NetworkWorker<runtime::Block, runtime::Hash>,
async_channel::Receiver<IncomingRequest>,
Box<dyn NotificationService>,
) {
let (tx, rx) = async_channel::bounded(10);
let request_response_config =
NetworkWorker::<runtime::Block, runtime::Hash>::request_response_config(
"/request-response/1".into(),
vec![],
MAX_RESPONSE_SIZE,
MAX_RESPONSE_SIZE,
Duration::from_secs(2),
Some(tx),
);
let mut network_config =
FullNetworkConfiguration::new(&NetworkConfiguration::new_local(), None);
network_config.add_request_response_protocol(request_response_config);
let (block_announce_config, notification_service) = NonDefaultSetConfig::new(
"/block-announces/1".into(),
vec![],
1024,
Expand All @@ -34,30 +48,12 @@ pub fn dummy_block_announce_config() -> NonDefaultSetConfig {
non_reserved_mode: NonReservedPeerMode::Accept,
},
);
block_announce_config
}

pub fn create_network_worker(
) -> (NetworkWorker<runtime::Block, runtime::Hash>, async_channel::Receiver<IncomingRequest>) {
let (tx, rx) = async_channel::bounded(10);
let request_response_config =
NetworkWorker::<runtime::Block, runtime::Hash>::request_response_config(
"request_response".into(),
vec![],
MAX_RESPONSE_SIZE,
MAX_RESPONSE_SIZE,
Duration::from_secs(2),
Some(tx),
);
let mut network_config =
FullNetworkConfiguration::new(&NetworkConfiguration::new_local(), None);
network_config.add_request_response_protocol(request_response_config);
let worker = NetworkWorker::<runtime::Block, runtime::Hash>::new(Params::<
runtime::Block,
runtime::Hash,
NetworkWorker<_, _>,
> {
block_announce_config: dummy_block_announce_config(),
block_announce_config,
role: Role::Full,
executor: Box::new(|f| {
tokio::spawn(f);
Expand All @@ -72,7 +68,7 @@ pub fn create_network_worker(
})
.unwrap();

(worker, rx)
(worker, rx, notification_service)
}

async fn get_listen_address(
Expand All @@ -87,42 +83,49 @@ async fn get_listen_address(
}
}

async fn run(size: usize, limit: usize) {
async fn run_consistently(size: usize, limit: usize) {
let mut received_counter = 0;
let (mut worker1, rx1) = create_network_worker();
let (mut worker1, _rx1, _notification_service1) = create_network_worker();
let service1 = worker1.service().clone();
let (mut worker2, rx2) = create_network_worker();
let (mut worker2, rx2, _notification_service2) = create_network_worker();
let peer_id2 = *worker2.local_peer_id();
let listen_address2 = get_listen_address(&mut worker2).await;

worker1.add_known_address(peer_id2, listen_address2);

tokio::spawn(worker1.run());
tokio::spawn(worker2.run());
let (tx, rx) = futures::channel::oneshot::channel();

service1.start_request(
peer_id2.into(),
"request_response".into(),
vec![0; 8],
None,
tx,
IfDisconnected::TryConnect,
);

let xxx = rx.await;
println!("xxx {:?}", xxx);
let requests = async move {
while received_counter < limit {
received_counter += 1;
let _ = service1
.request(
peer_id2.into(),
"/request-response/1".into(),
vec![0; 8],
None,
IfDisconnected::TryConnect,
)
.await
.unwrap();
}
};
let network1_run = worker1.run();
let network2_run = worker2.run();
tokio::pin!(requests);
tokio::pin!(network1_run);
tokio::pin!(network2_run);

loop {
tokio::select! {
// res = rx => {
// println!("req {:?}", res);
// },
x1 = rx1.recv() => {
println!("1 {:?}", x1);
},
x2 = rx2.recv() => {
println!("2 {:?}", x2);
_ = &mut network1_run => {},
_ = &mut network2_run => {},
_ = &mut requests => break,
res = rx2.recv() => {
let IncomingRequest { pending_response, .. } = res.unwrap();
pending_response.send(OutgoingResponse {
result: Ok(vec![0; size]),
reputation_changes: vec![],
sent_feedback: None,
}).unwrap();
}
}
}
Expand All @@ -140,7 +143,7 @@ fn run_benchmark(c: &mut Criterion) {
format!("{}/{}", notifications, size),
&(size, notifications),
|b, &(size, limit)| {
b.to_async(&rt).iter(|| run(size, limit));
b.to_async(&rt).iter(|| run_consistently(size, limit));
},
);
}
Expand Down

0 comments on commit c33f20f

Please sign in to comment.