Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove all batches related to a peer on disconnect #5969

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
92a6e77
Remove all batches related to a peer on disconnect
pawanjay176 Jun 20, 2024
9d90e39
Cleanup map entries after disconnect
pawanjay176 Jun 21, 2024
70af7d1
Allow lookups to continue in case of disconnections
pawanjay176 Jun 21, 2024
312be2c
Pretty response types
pawanjay176 Jun 21, 2024
a38caf4
fmt
pawanjay176 Jun 21, 2024
c7fd21d
Fix lints
pawanjay176 Jun 24, 2024
a8f64f2
Remove lookup if it cannot progress
pawanjay176 Jun 24, 2024
bc10fb2
Fix tests
pawanjay176 Jun 24, 2024
cd17f9f
Remove poll_close on rpc behaviour
pawanjay176 Jun 24, 2024
6ead176
Remove redundant test
pawanjay176 Jun 24, 2024
3e1c41a
Fix issue raised by lion
pawanjay176 Jun 24, 2024
dfaf238
Revert pretty response types
pawanjay176 Jun 24, 2024
b930c7c
Cleanup
pawanjay176 Jun 24, 2024
62167fb
Fix test
pawanjay176 Jun 24, 2024
0c8efc1
Merge remote-tracking branch 'origin/release-v5.2.1' into rpc-error-o…
michaelsproul Jun 25, 2024
26614f1
Apply suggestions from joao
pawanjay176 Jun 25, 2024
ec90bf0
Fix log
pawanjay176 Jun 25, 2024
e79c71a
update request status on no peers found
pawanjay176 Jun 25, 2024
caee7c8
Do not remove lookup after peer disconnection
pawanjay176 Jun 25, 2024
804f36d
Add comments about expected event api
dapplion Jun 25, 2024
9b2e9e0
Update single_block_lookup.rs
dapplion Jun 25, 2024
cd68550
Update mod.rs
dapplion Jun 25, 2024
5b0fbd9
Merge branch 'rpc-error-on-disconnect-revert' into 5969-review
dapplion Jun 25, 2024
59468f8
Merge pull request #10 from dapplion/5969-review
pawanjay176 Jun 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Pretty response types
  • Loading branch information
pawanjay176 committed Jun 21, 2024
commit 312be2c492536ccfc238b87fd688208b561484f9
6 changes: 3 additions & 3 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use self::parent_chain::{compute_parent_chains, NodeChain};
pub use self::single_block_lookup::DownloadResult;
use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup};
use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE};
use super::network_context::{RpcResponseResult, SyncNetworkContext};
use super::network_context::{RpcResponseError, SyncNetworkContext};
use crate::metrics;
use crate::sync::block_lookups::common::ResponseType;
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
Expand Down Expand Up @@ -337,7 +337,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
response: RpcResponseResult<R::VerifiedResponseType>,
response: Result<(R::VerifiedResponseType, Duration), RpcResponseError>,
cx: &mut SyncNetworkContext<T>,
) {
let result = self.on_download_response_inner::<R>(id, peer_id, response, cx);
Expand All @@ -349,7 +349,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
&mut self,
id: SingleLookupReqId,
peer_id: PeerId,
response: RpcResponseResult<R::VerifiedResponseType>,
response: Result<(R::VerifiedResponseType, Duration), RpcResponseError>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupRequestError> {
// Note: no need to downscore peers here, already downscored on network context
Expand Down
50 changes: 29 additions & 21 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@

use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
use super::block_lookups::BlockLookups;
use super::network_context::{BlockOrBlob, RangeRequestId, RpcEvent, SyncNetworkContext};
use super::network_context::{
BlockOrBlob, RangeRequestId, RpcEvent, RpcResponseResult, SyncNetworkContext,
};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor};
Expand Down Expand Up @@ -860,21 +862,24 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id: PeerId,
block: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) {
if let Some(resp) = self.network.on_single_block_response(id, peer_id, block) {
self.block_lookups
match self.network.on_single_block_response(id, peer_id, block) {
RpcResponseResult::Response(resp) => self
.block_lookups
.on_download_response::<BlockRequestState<T::EthSpec>>(
id,
peer_id,
resp,
&mut self.network,
)
} else {
debug!(
self.log,
"RPC error for block lookup has no associated entry in network context, ungraceful disconnect";
"peer_id" => %peer_id,
"request_id" => ?id,
);
),
RpcResponseResult::RequestNotFound => {
debug!(
self.log,
"RPC error for block lookup has no associated entry in network context, ungraceful disconnect";
"peer_id" => %peer_id,
"request_id" => ?id,
);
}
RpcResponseResult::NoOp | RpcResponseResult::StreamTermination => {}
}
}

Expand Down Expand Up @@ -909,21 +914,24 @@ impl<T: BeaconChainTypes> SyncManager<T> {
peer_id: PeerId,
blob: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) {
if let Some(resp) = self.network.on_single_blob_response(id, peer_id, blob) {
self.block_lookups
match self.network.on_single_blob_response(id, peer_id, blob) {
RpcResponseResult::Response(resp) => self
.block_lookups
.on_download_response::<BlobRequestState<T::EthSpec>>(
id,
peer_id,
resp,
&mut self.network,
)
} else {
debug!(
self.log,
"RPC error for blob lookup has no associated entry in network context, ungraceful disconnect";
"peer_id" => %peer_id,
"request_id" => ?id,
);
),
RpcResponseResult::RequestNotFound => {
debug!(
self.log,
"RPC error for blob lookup has no associated entry in network context, ungraceful disconnect";
"peer_id" => %peer_id,
"request_id" => ?id,
);
}
RpcResponseResult::NoOp | RpcResponseResult::StreamTermination => {}
}
}

Expand Down
39 changes: 22 additions & 17 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ pub enum RpcEvent<T> {
RPCError(RPCError),
}

pub type RpcResponseResult<T> = Result<(T, Duration), RpcResponseError>;
pub enum RpcResponseResult<T> {
Response(Result<(T, Duration), RpcResponseError>),
StreamTermination,
RequestNotFound,
NoOp,
}

pub enum RpcResponseError {
RpcError(RPCError),
Expand Down Expand Up @@ -582,46 +587,46 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request_id: SingleLookupReqId,
peer_id: PeerId,
block: RpcEvent<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> Option<RpcResponseResult<Arc<SignedBeaconBlock<T::EthSpec>>>> {
) -> RpcResponseResult<Arc<SignedBeaconBlock<T::EthSpec>>> {
let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else {
return None;
return RpcResponseResult::RequestNotFound;
};

let resp = match block {
RpcEvent::Response(block, seen_timestamp) => {
match request.get_mut().add_response(block) {
Ok(block) => Ok((block, seen_timestamp)),
Ok(block) => RpcResponseResult::Response(Ok((block, seen_timestamp))),
Err(e) => {
// The request must be dropped after receiving an error.
request.remove();
Err(e.into())
RpcResponseResult::Response(Err(e.into()))
}
}
}
RpcEvent::StreamTermination => match request.remove().terminate() {
Ok(_) => return None,
Err(e) => Err(e.into()),
Ok(_) => return RpcResponseResult::StreamTermination,
Err(e) => RpcResponseResult::Response(Err(e.into())),
},
RpcEvent::RPCError(e) => {
request.remove();
Err(e.into())
RpcResponseResult::Response(Err(e.into()))
}
};

if let Err(RpcResponseError::VerifyError(e)) = &resp {
if let RpcResponseResult::Response(Err(RpcResponseError::VerifyError(e))) = &resp {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}
Some(resp)
resp
}

pub fn on_single_blob_response(
&mut self,
request_id: SingleLookupReqId,
peer_id: PeerId,
blob: RpcEvent<Arc<BlobSidecar<T::EthSpec>>>,
) -> Option<RpcResponseResult<FixedBlobSidecarList<T::EthSpec>>> {
) -> RpcResponseResult<FixedBlobSidecarList<T::EthSpec>> {
let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else {
return None;
return RpcResponseResult::RequestNotFound;
};

let resp = match blob {
Expand All @@ -631,20 +636,20 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Ok(Some(blobs)) => to_fixed_blob_sidecar_list(blobs)
.map(|blobs| (blobs, seen_timestamp))
.map_err(|e| (e.into(), request.resolve())),
Ok(None) => return None,
Ok(None) => return RpcResponseResult::NoOp,
Err(e) => Err((e.into(), request.resolve())),
}
}
RpcEvent::StreamTermination => match request.remove().terminate() {
Ok(_) => return None,
Ok(_) => return RpcResponseResult::StreamTermination,
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
// (err, false = not resolved) because terminate returns Ok() if resolved
Err(e) => Err((e.into(), false)),
},
RpcEvent::RPCError(e) => Err((e.into(), request.remove().resolve())),
};

match resp {
Ok(resp) => Some(Ok(resp)),
Ok(resp) => RpcResponseResult::Response(Ok(resp)),
// Track if this request has already returned some value downstream. Ensure that
// downstream code only receives a single Result per request. If the serving peer does
// multiple penalizable actions per request, downscore and return None. This allows to
Expand All @@ -655,9 +660,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.report_peer(peer_id, PeerAction::LowToleranceError, e.into());
}
if resolved {
None
RpcResponseResult::NoOp
} else {
Some(Err(e))
RpcResponseResult::Response(Err(e))
}
}
}
Expand Down