Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Introduce OpaqueBlockRequest/OpaqueBlockResponse, make scheme m…
Browse files Browse the repository at this point in the history
…odule of `sc-network-sync` private
  • Loading branch information
nazar-pc committed May 14, 2022
1 parent d6d0a46 commit a91cd02
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 115 deletions.
40 changes: 39 additions & 1 deletion client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub mod message;
pub mod warp;

use libp2p::PeerId;
use message::{BlockAnnounce, BlockRequest, BlockResponse};
use message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse};
use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
use sp_consensus::BlockOrigin;
use sp_runtime::{
Expand Down Expand Up @@ -197,6 +197,28 @@ impl fmt::Debug for OpaqueStateResponse {
}
}

/// Wrapper for implementation-specific block request.
///
/// NOTE: Implementation must be able to encode and decode it for network purposes.
pub struct OpaqueBlockRequest(pub Box<dyn Any + Send>);

impl fmt::Debug for OpaqueBlockRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("OpaqueBlockRequest").finish()
}
}

/// Wrapper for implementation-specific block response.
///
/// NOTE: Implementation must be able to encode and decode it for network purposes.
pub struct OpaqueBlockResponse(pub Box<dyn Any + Send>);

impl fmt::Debug for OpaqueBlockResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("OpaqueBlockResponse").finish()
}
}

pub trait ChainSync<Block: BlockT>: Send {
/// Returns the state of the sync of the given peer.
///
Expand Down Expand Up @@ -358,6 +380,22 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Return some key metrics.
fn metrics(&self) -> Metrics;

/// Create implementation-specific block request.
fn create_opaque_block_request(&self, request: &BlockRequest<Block>) -> OpaqueBlockRequest;

/// Encode implementation-specific block request.
fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result<Vec<u8>, String>;

/// Decode implementation-specific block response.
fn decode_block_response(&self, response: &[u8]) -> Result<OpaqueBlockResponse, String>;

/// Access blocks from implementation-specific block response.
fn block_response_into_blocks(
&self,
request: &BlockRequest<Block>,
response: OpaqueBlockResponse,
) -> Result<Vec<BlockData<Block>>, String>;

/// Encode implementation-specific state request.
fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result<Vec<u8>, String>;

Expand Down
35 changes: 18 additions & 17 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use libp2p::{
NetworkBehaviour,
};
use log::debug;
use prost::Message;

use sc_client_api::{BlockBackend, ProofProvider};
use sc_consensus::import_queue::{IncomingBlock, Origin};
use sc_network_common::{config::ProtocolId, request_responses::ProtocolConfig};
Expand Down Expand Up @@ -382,23 +382,24 @@ where
.events
.push_back(BehaviourOut::JustificationImport(origin, hash, nb, justification)),
CustomMessageOutcome::BlockRequest { target, request, pending_response } => {
let mut buf = Vec::with_capacity(request.encoded_len());
if let Err(err) = request.encode(&mut buf) {
log::warn!(
target: "sync",
"Failed to encode block request {:?}: {:?}",
request, err
);
return
match self.substrate.encode_block_request(&request) {
Ok(data) => {
self.request_responses.send_request(
&target,
&self.block_request_protocol_name,
data,
pending_response,
IfDisconnected::ImmediateError,
);
},
Err(err) => {
log::warn!(
target: "sync",
"Failed to encode block request {:?}: {:?}",
request, err
);
},
}

self.request_responses.send_request(
&target,
&self.block_request_protocol_name,
buf,
pending_response,
IfDisconnected::ImmediateError,
);
},
CustomMessageOutcome::StateRequest { target, request, pending_response } => {
match self.substrate.encode_state_request(&request) {
Expand Down
133 changes: 41 additions & 92 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,22 @@ use message::{
};
use notifications::{Notifications, NotificationsOut};
use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64};
use prost::Message as _;
use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::import_queue::{BlockImportError, BlockImportStatus, IncomingBlock, Origin};
use sc_network_common::{
config::ProtocolId,
sync::{
message::{
BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, BlockState,
FromBlock,
},
BadPeer, ChainSync, OnBlockData, OnBlockJustification, OnStateData,
PollBlockAnnounceValidation, SyncStatus,
BadPeer, ChainSync, OnBlockData, OnBlockJustification, OnStateData, OpaqueBlockRequest,
OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PollBlockAnnounceValidation,
SyncStatus,
},
warp_sync::{EncodedProof, WarpProofRequest},
};
use sp_arithmetic::traits::SaturatedConversion;
use sp_blockchain::HeaderMetadata;
use sp_consensus::BlockOrigin;
use sp_runtime::{
generic::BlockId,
Expand All @@ -83,8 +83,6 @@ pub mod event;
pub mod message;

pub use notifications::{NotificationsSink, NotifsHandlerError, Ready};
use sc_network_common::sync::{OpaqueStateRequest, OpaqueStateResponse};
use sp_blockchain::HeaderMetadata;

/// Interval at which we perform time based maintenance
const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
Expand Down Expand Up @@ -564,62 +562,9 @@ where
&mut self,
peer_id: PeerId,
request: BlockRequest<B>,
response: sc_network_sync::schema::v1::BlockResponse,
response: OpaqueBlockResponse,
) -> CustomMessageOutcome<B> {
let blocks = response
.blocks
.into_iter()
.map(|block_data| {
Ok(BlockData::<B> {
hash: Decode::decode(&mut block_data.hash.as_ref())?,
header: if !block_data.header.is_empty() {
Some(Decode::decode(&mut block_data.header.as_ref())?)
} else {
None
},
body: if request.fields.contains(BlockAttributes::BODY) {
Some(
block_data
.body
.iter()
.map(|body| Decode::decode(&mut body.as_ref()))
.collect::<Result<Vec<_>, _>>()?,
)
} else {
None
},
indexed_body: if request.fields.contains(BlockAttributes::INDEXED_BODY) {
Some(block_data.indexed_body)
} else {
None
},
receipt: if !block_data.receipt.is_empty() {
Some(block_data.receipt)
} else {
None
},
message_queue: if !block_data.message_queue.is_empty() {
Some(block_data.message_queue)
} else {
None
},
justification: if !block_data.justification.is_empty() {
Some(block_data.justification)
} else if block_data.is_empty_justification {
Some(Vec::new())
} else {
None
},
justifications: if !block_data.justifications.is_empty() {
Some(DecodeAll::decode_all(&mut block_data.justifications.as_ref())?)
} else {
None
},
})
})
.collect::<Result<Vec<_>, codec::Error>>();

let blocks = match blocks {
let blocks = match self.chain_sync.block_response_into_blocks(&request, response) {
Ok(blocks) => blocks,
Err(err) => {
debug!(target: "sync", "Failed to decode block response from {}: {}", peer_id, err);
Expand Down Expand Up @@ -664,7 +609,7 @@ where
Ok(OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(OnBlockData::Request(peer, req)) =>
prepare_block_request(&mut self.peers, peer, req),
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, peer, req),
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
Expand Down Expand Up @@ -829,8 +774,12 @@ where
.push_back(CustomMessageOutcome::PeerNewBest(who, status.best_number));

if let Some(req) = req {
self.pending_messages
.push_back(prepare_block_request(&mut self.peers, who, req));
self.pending_messages.push_back(prepare_block_request(
self.chain_sync.as_ref(),
&mut self.peers,
who,
req,
));
}

Ok(())
Expand Down Expand Up @@ -997,7 +946,7 @@ where
Ok(OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(OnBlockData::Request(peer, req)) =>
prepare_block_request(&mut self.peers, peer, req),
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, peer, req),
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
Expand Down Expand Up @@ -1051,6 +1000,7 @@ where
match result {
Ok((id, req)) => {
self.pending_messages.push_back(prepare_block_request(
self.chain_sync.as_ref(),
&mut self.peers,
id,
req,
Expand Down Expand Up @@ -1190,6 +1140,11 @@ where
}
}

/// Encode implementation-specific block request.
pub fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result<Vec<u8>, String> {
self.chain_sync.encode_block_request(request)
}

/// Encode implementation-specific state request.
pub fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result<Vec<u8>, String> {
self.chain_sync.encode_state_request(request)
Expand Down Expand Up @@ -1226,6 +1181,7 @@ where
}

fn prepare_block_request<B: BlockT>(
chain_sync: &dyn ChainSync<B>,
peers: &mut HashMap<PeerId, Peer<B>>,
who: PeerId,
request: BlockRequest<B>,
Expand All @@ -1236,19 +1192,7 @@ fn prepare_block_request<B: BlockT>(
peer.request = Some((PeerRequest::Block(request.clone()), rx));
}

let request = sc_network_sync::schema::v1::BlockRequest {
fields: request.fields.to_be_u32(),
from_block: match request.from {
FromBlock::Hash(h) =>
Some(sc_network_sync::schema::v1::block_request::FromBlock::Hash(h.encode())),
FromBlock::Number(n) =>
Some(sc_network_sync::schema::v1::block_request::FromBlock::Number(n.encode())),
},
to_block: request.to.map(|h| h.encode()).unwrap_or_default(),
direction: request.direction as i32,
max_blocks: request.max.unwrap_or(0),
support_multiple_justifications: true,
};
let request = chain_sync.create_opaque_block_request(&request);

CustomMessageOutcome::BlockRequest { target: who, request, pending_response: tx }
}
Expand Down Expand Up @@ -1313,7 +1257,7 @@ pub enum CustomMessageOutcome<B: BlockT> {
/// A new block request must be emitted.
BlockRequest {
target: PeerId,
request: sc_network_sync::schema::v1::BlockRequest,
request: OpaqueBlockRequest,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// A new storage request must be emitted.
Expand Down Expand Up @@ -1422,10 +1366,8 @@ where
let (req, _) = peer.request.take().unwrap();
match req {
PeerRequest::Block(req) => {
let protobuf_response =
match sc_network_sync::schema::v1::BlockResponse::decode(
&resp[..],
) {
let response =
match self.chain_sync.decode_block_response(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
Expand All @@ -1441,7 +1383,7 @@ where
},
};

finished_block_requests.push((*id, req, protobuf_response));
finished_block_requests.push((*id, req, response));
},
PeerRequest::State => {
let response =
Expand Down Expand Up @@ -1520,12 +1462,12 @@ where
}
}
}
for (id, req, protobuf_response) in finished_block_requests {
let ev = self.on_block_response(id, req, protobuf_response);
for (id, req, response) in finished_block_requests {
let ev = self.on_block_response(id, req, response);
self.pending_messages.push_back(ev);
}
for (id, protobuf_response) in finished_state_requests {
let ev = self.on_state_response(id, protobuf_response);
for (id, response) in finished_state_requests {
let ev = self.on_state_response(id, response);
self.pending_messages.push_back(ev);
}
for (id, response) in finished_warp_sync_requests {
Expand All @@ -1537,16 +1479,23 @@ where
self.tick();
}

for (id, request) in self.chain_sync.block_requests() {
let event = prepare_block_request(&mut self.peers, *id, request);
for (id, request) in self
.chain_sync
.block_requests()
.map(|(peer_id, request)| (*peer_id, request))
.collect::<Vec<_>>()
{
let event =
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, id, request);
self.pending_messages.push_back(event);
}
if let Some((id, request)) = self.chain_sync.state_request() {
let event = prepare_state_request(&mut self.peers, id, request);
self.pending_messages.push_back(event);
}
for (id, request) in self.chain_sync.justification_requests() {
let event = prepare_block_request(&mut self.peers, id, request);
for (id, request) in self.chain_sync.justification_requests().collect::<Vec<_>>() {
let event =
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, id, request);
self.pending_messages.push_back(event);
}
if let Some((id, request)) = self.chain_sync.warp_sync_request() {
Expand Down
Loading

0 comments on commit a91cd02

Please sign in to comment.