Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] Refactor client and client actor to move the code for block processing to client #7898

Merged
merged 8 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 194 additions & 2 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use near_chain::{
use near_chain_configs::ClientConfig;
use near_chunks::ShardsManager;
use near_network::types::{
FullPeerInfo, NetworkClientResponses, NetworkRequests, PeerManagerAdapter,
FullPeerInfo, NetworkClientResponses, NetworkRequests, PeerManagerAdapter, ReasonForBan,
};
use near_primitives::block::{Approval, ApprovalInner, ApprovalMessage, Block, BlockHeader, Tip};
use near_primitives::challenge::{Challenge, ChallengeBody};
Expand All @@ -54,6 +54,7 @@ use near_network::types::{AccountKeys, ChainInfo, PeerManagerMessageRequest, Set
use near_o11y::{log_assert, WithSpanContextExt};
use near_primitives::block_header::ApprovalType;
use near_primitives::epoch_manager::RngSeed;
use near_primitives::network::PeerId;
use near_primitives::version::PROTOCOL_VERSION;
use near_primitives::views::CatchupStatusView;

Expand All @@ -65,6 +66,8 @@ pub const EPOCH_SYNC_REQUEST_TIMEOUT: Duration = Duration::from_millis(1_000);
/// How frequently a Epoch Sync response can be sent to a particular peer
// TODO #3488 set 60_000
pub const EPOCH_SYNC_PEER_TIMEOUT: Duration = Duration::from_millis(10);
/// Drop blocks whose height are beyond head + horizon if it is not in the current epoch.
const BLOCK_HORIZON: u64 = 500;

/// number of blocks at the epoch start for which we will log more detailed info
pub const EPOCH_START_INFO_BLOCKS: u64 = 500;
Expand Down Expand Up @@ -807,6 +810,162 @@ impl Client {
}
}

/// Processes received block. Ban peer if the block header is invalid or the block is ill-formed.
// This function is just a wrapper for process_block_impl that makes error propagation easier.
pub fn process_block(
mzhangmzz marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
block: Block,
peer_id: PeerId,
was_requested: bool,
apply_chunks_done_callback: DoneApplyChunkCallback,
) {
let hash = *block.hash();
let prev_hash = *block.header().prev_hash();
let _span = tracing::debug_span!(
target: "client",
"process_block",
mzhangmzz marked this conversation as resolved.
Show resolved Hide resolved
me = ?self.validator_signer.as_ref().map(|vs| vs.validator_id()),
prev_hash = %prev_hash,
mzhangmzz marked this conversation as resolved.
Show resolved Hide resolved
%hash,
height = block.header().height(),
%peer_id,
was_requested)
.entered();

match self.process_block_impl(
block,
peer_id.clone(),
was_requested,
apply_chunks_done_callback,
) {
Ok(_) => {}
// Log the errors here. Note that the real error handling logic is already
// done within process_block_impl, this is just for logging.
mzhangmzz marked this conversation as resolved.
Show resolved Hide resolved
Err(ref err) if err.is_bad_data() => {
warn!(target: "client", "Receive bad block: {}", err);
}
Err(ref err) if err.is_error() => {
if let near_chain::Error::DBNotFoundErr(msg) = err {
debug_assert!(!msg.starts_with("BLOCK HEIGHT"), "{:?}", err);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pre-existing sketchiness, iffing on error messages is yuck

if self.sync_status.is_syncing() {
// While syncing, we may receive blocks that are older or from next epochs.
// This leads to Old Block or EpochOutOfBounds errors.
debug!(target: "client", "Error on receival of block: {}", err);
} else {
error!(target: "client", "Error on receival of block: {}", err);
}
}
Err(e) => {
debug!(target: "client", error = %e, "Process block: refused by chain");
}
}
}

/// Processes received block. Ban peer if the block header is invalid or the block is ill-formed.
pub fn process_block_impl(
mzhangmzz marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
block: Block,
peer_id: PeerId,
was_requested: bool,
apply_chunks_done_callback: DoneApplyChunkCallback,
) -> Result<(), near_chain::Error> {
let prev_hash = *block.header().prev_hash();
matklad marked this conversation as resolved.
Show resolved Hide resolved
// To protect ourselves from spamming, we do some pre-check on block height before we do any
// real processing.
if !self.check_block_height(&block, was_requested)? {
return Ok(());
}
let block = block.into();
self.verify_and_rebroadcast_block(&block, was_requested, &peer_id)?;
let provenance =
if was_requested { near_chain::Provenance::SYNC } else { near_chain::Provenance::NONE };
let res = self.start_process_block(block.into(), provenance, apply_chunks_done_callback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let res = self.start_process_block(block.into(), provenance, apply_chunks_done_callback);
let res = self.start_process_block(block, provenance, apply_chunks_done_callback);

match &res {
Err(near_chain::Error::Orphan) => {
if !self.chain.is_orphan(&prev_hash) {
self.request_block(prev_hash, peer_id)
}
}
_ => {}
}
res
}

/// To protect ourselves from spamming, we do some pre-check on block height before we do any
/// processing. This function returns true if the block height is valid.
fn check_block_height(
&self,
block: &Block,
was_requested: bool,
) -> Result<bool, near_chain::Error> {
let head = self.chain.head()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, so here we replace unwrap_or_returns with just ?, right? Looks correct to me, given that we just log the stuff at the call-side. Also, major kuros for removing unwrap_or_return -- it was making the code quite hard to read!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

let is_syncing = self.sync_status.is_syncing();
if block.header().height() >= head.height + BLOCK_HORIZON && is_syncing && !was_requested {
debug!(target: "client", head_height = head.height, "Dropping a block that is too far ahead.");
return Ok(false);
}
let tail = self.chain.tail()?;
if block.header().height() < tail {
debug!(target: "client", tail_height = tail, "Dropping a block that is too far behind.");
return Ok(false);
}
// drop the block if a) it is not requested, b) we already processed this height, c) it is not building on top of current head
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap this comment, its too too long, https://plugins.jetbrains.com/plugin/7234-wrap-to-column

if !was_requested
&& block.header().prev_hash()
!= &self
.chain
.head()
.map_or_else(|_| CryptoHash::default(), |tip| tip.last_block_hash)
{
if self.chain.is_height_processed(block.header().height())? {
debug!(target: "client", height = block.header().height(), "Dropping a block because we've seen this height before and we didn't request it");
return Ok(false);
}
}
Ok(true)
}

/// Verify the block and rebroadcast it if it is valid, ban the peer if it's invalid.
/// Ignore all other errors because the full block will be processed later.
/// Note that this happens before the full block processing logic because we want blocks to be
/// propagated in the network fast.
fn verify_and_rebroadcast_block(
&mut self,
block: &MaybeValidated<Block>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, the fact that we statically know that the block is not validated here, but is validatde afterwards, gives me a pause. It seems that, eg, start_process_block always receives a validated block?

Not sure if there's anythnig to fixhere, and that's definietly out of scope for the pr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I had the same thoughts too and also agree that it's out of the scope of this PR. I will create a github issue and look more into this.

was_requested: bool,
peer_id: &PeerId,
) -> Result<(), near_chain::Error> {
let res = self.chain.process_block_header(block.header(), &mut vec![]);
let res = res.and_then(|_| self.chain.validate_block(block));
match res {
Ok(_) => {
let head = self.chain.head()?;
// do not broadcast blocks that are too far back.
if (head.height < block.header().height()
|| &head.epoch_id == block.header().epoch_id())
&& !was_requested
&& !self.sync_status.is_syncing()
{
self.rebroadcast_block(block.as_ref().into_inner());
}
Ok(())
}
Err(e) if e.is_bad_data() => {
self.ban_peer(peer_id.clone(), ReasonForBan::BadBlockHeader);
Err(e)
}
Err(_) => {
// We are ignoring all other errors and proceeding with the
// block. If it is an orphan (i.e. we haven’t processed its
// previous block) than we will get MissingBlock errors. In
// those cases we shouldn’t reject the block instead passing
// it along. Eventually, it’ll get saved as an orphan.
Ok(())
}
}
}

/// Start the processing of a block. Note that this function will return before
/// the full processing is finished because applying chunks is done asynchronously
/// in the rayon thread pool.
Expand Down Expand Up @@ -932,7 +1091,7 @@ impl Client {
self.request_missing_chunks(blocks_missing_chunks, orphans_missing_chunks);
}

pub fn rebroadcast_block(&mut self, block: &Block) {
fn rebroadcast_block(&mut self, block: &Block) {
if self.rebroadcasted_blocks.get(block.hash()).is_none() {
self.network_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::Block {
Expand Down Expand Up @@ -1868,6 +2027,39 @@ impl Client {
}
}

/* implements functions used to communicate with network */
impl Client {
pub fn request_block(&self, hash: CryptoHash, peer_id: PeerId) {
match self.chain.block_exists(&hash) {
Ok(false) => {
self.network_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::BlockRequest {
hash,
peer_id,
})
.with_span_context(),
);
}
Ok(true) => {
debug!(target: "client", "send_block_request_to_peer: block {} already known", hash)
}
Err(e) => {
error!(target: "client", "send_block_request_to_peer: failed to check block exists: {:?}", e)
}
}
}

pub fn ban_peer(&self, peer_id: PeerId, ban_reason: ReasonForBan) {
self.network_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::BanPeer {
peer_id,
ban_reason,
})
.with_span_context(),
);
}
}

impl Client {
/// Each epoch defines a set of important accounts: block producers, chunk producers,
/// approvers. Low-latency reliable communication between those accounts is critical,
Expand Down
Loading