diff --git a/prdoc/pr_5635.prdoc b/prdoc/pr_5635.prdoc new file mode 100644 index 000000000000..168d65970c95 --- /dev/null +++ b/prdoc/pr_5635.prdoc @@ -0,0 +1,13 @@ +title: Fix edge case where state sync is not triggered + +doc: + - audience: Node Dev + description: | + There is an edge case where the finalized block notification is received, but the conditions required to initiate the + state sync are not fully met. In such cases, state sync would fail to start as expected and remain stalled. + This patch addresses it by storing the pending attempt and trying to start the state sync later when the conditions + are satisfied. + +crates: + - name: sc-network-sync + bump: patch diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index f29ed1b083e8..cca83a5055cb 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -254,6 +254,14 @@ pub struct ChainSync { /// A set of hashes of blocks that are being downloaded or have been /// downloaded and are queued for import. queue_blocks: HashSet, + /// A pending attempt to start the state sync. + /// + /// The initiation of state sync may be deferred in cases where other conditions + /// are not yet met when the finalized block notification is received, such as + /// when `queue_blocks` is not empty or there are no peers. This field holds the + /// necessary information to attempt the state sync at a later point when + /// conditions are satisfied. + pending_state_sync_attempt: Option<(B::Hash, NumberFor, bool)>, /// Fork sync targets. fork_targets: HashMap>, /// A set of peers for which there might be potential block requests @@ -376,6 +384,7 @@ where extra_justifications: ExtraRequests::new("justification", metrics_registry), mode, queue_blocks: Default::default(), + pending_state_sync_attempt: None, fork_targets: Default::default(), allowed_requests: Default::default(), max_parallel_downloads, @@ -497,7 +506,7 @@ where "💔 New peer {} with unknown genesis hash {} ({}).", peer_id, best_hash, best_number, ); - return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH)) + return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH)); } // If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have @@ -521,7 +530,7 @@ where state: PeerSyncState::Available, }, ); - return Ok(None) + return Ok(None); } // If we are at genesis, just start downloading. @@ -644,14 +653,14 @@ where if self.is_known(hash) { debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}"); - return + return; } trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}"); for peer_id in &peers { if let Some(peer) = self.peers.get_mut(peer_id) { if let PeerSyncState::AncestorSearch { .. } = peer.state { - continue + continue; } if number > peer.best_number { @@ -748,14 +757,14 @@ where blocks } else { debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}"); - return Err(BadPeer(*peer_id, rep::NO_BLOCK)) + return Err(BadPeer(*peer_id, rep::NO_BLOCK)); } }, PeerSyncState::DownloadingStale(_) => { peer.state = PeerSyncState::Available; if blocks.is_empty() { debug!(target: LOG_TARGET, "Empty block response from {peer_id}"); - return Err(BadPeer(*peer_id, rep::NO_BLOCK)) + return Err(BadPeer(*peer_id, rep::NO_BLOCK)); } validate_blocks::(&blocks, peer_id, Some(request))?; blocks @@ -796,14 +805,14 @@ where target: LOG_TARGET, "Invalid response when searching for ancestor from {peer_id}", ); - return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR)) + return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR)); }, (_, Err(e)) => { info!( target: LOG_TARGET, "❌ Error answering legitimate blockchain query: {e}", ); - return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR)) + return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR)); }, }; if matching_hash.is_some() { @@ -837,7 +846,7 @@ where target: LOG_TARGET, "Ancestry search: genesis mismatch for peer {peer_id}", ); - return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH)) + return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH)); } if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *current, matching_hash.is_some()) @@ -852,7 +861,7 @@ where peer_id: *peer_id, request, }); - return Ok(()) + return Ok(()); } else { // Ancestry search is complete. Check if peer is on a stale fork unknown // to us and add it to sync targets if necessary. @@ -892,7 +901,7 @@ where .insert(*peer_id); } peer.state = PeerSyncState::Available; - return Ok(()) + return Ok(()); } }, PeerSyncState::Available | @@ -925,7 +934,7 @@ where } } else { // We don't know of this peer, so we also did not request anything from it. - return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)); }; self.validate_and_queue_blocks(new_blocks, gap); @@ -947,7 +956,7 @@ where target: LOG_TARGET, "💔 Called on_block_justification with a peer ID of an unknown peer", ); - return Ok(()) + return Ok(()); }; self.allowed_requests.add(&peer_id); @@ -964,7 +973,7 @@ where hash, block.hash, ); - return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION)) + return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION)); } block @@ -990,7 +999,7 @@ where number, justifications, }); - return Ok(()) + return Ok(()); } } @@ -1013,26 +1022,11 @@ where }); if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode { - if self.state_sync.is_none() && !self.peers.is_empty() && self.queue_blocks.is_empty() { - // Finalized a recent block. - let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect(); - heads.sort(); - let median = heads[heads.len() / 2]; - if number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { - if let Ok(Some(header)) = self.client.header(*hash) { - log::debug!( - target: LOG_TARGET, - "Starting state sync for #{number} ({hash})", - ); - self.state_sync = Some(StateSync::new( - self.client.clone(), - header, - None, - None, - *skip_proofs, - )); - self.allowed_requests.set_all(); - } + if self.state_sync.is_none() { + if !self.peers.is_empty() && self.queue_blocks.is_empty() { + self.attempt_state_sync(*hash, number, *skip_proofs); + } else { + self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs)); } } } @@ -1045,6 +1039,35 @@ where } } + fn attempt_state_sync( + &mut self, + finalized_hash: B::Hash, + finalized_number: NumberFor, + skip_proofs: bool, + ) { + let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect(); + heads.sort(); + let median = heads[heads.len() / 2]; + if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { + if let Ok(Some(header)) = self.client.header(finalized_hash) { + log::debug!( + target: LOG_TARGET, + "Starting state sync for #{finalized_number} ({finalized_hash})", + ); + self.state_sync = + Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs)); + self.allowed_requests.set_all(); + } else { + log::error!( + target: LOG_TARGET, + "Failed to start state sync: header for finalized block \ + #{finalized_number} ({finalized_hash}) is not available", + ); + debug_assert!(false); + } + } + } + /// Submit a validated block announcement. /// /// Returns new best hash & best number of the peer if they are updated. @@ -1067,12 +1090,12 @@ where peer } else { error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}"); - return Some((hash, number)) + return Some((hash, number)); }; if let PeerSyncState::AncestorSearch { .. } = peer.state { trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id); - return None + return None; } let peer_info = is_best.then(|| { @@ -1102,7 +1125,7 @@ where if let Some(target) = self.fork_targets.get_mut(&hash) { target.peers.insert(peer_id); } - return peer_info + return peer_info; } if ancient_parent { @@ -1113,7 +1136,7 @@ where hash, announce.header, ); - return peer_info + return peer_info; } if self.status().state == SyncState::Idle { @@ -1281,7 +1304,7 @@ where for (n, peer) in self.peers.iter_mut() { if let PeerSyncState::AncestorSearch { .. } = peer.state { // Wait for ancestry search to complete first. - continue + continue; } let new_common_number = if peer.best_number >= number { number } else { peer.best_number }; @@ -1401,7 +1424,7 @@ where /// What is the status of the block corresponding to the given hash? fn block_status(&self, hash: &B::Hash) -> Result { if self.queue_blocks.contains(hash) { - return Ok(BlockStatus::Queued) + return Ok(BlockStatus::Queued); } self.client.block_status(*hash) } @@ -1521,12 +1544,12 @@ where /// Get block requests scheduled by sync to be sent out. fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { if self.allowed_requests.is_empty() || self.state_sync.is_some() { - return Vec::new() + return Vec::new(); } if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { trace!(target: LOG_TARGET, "Too many blocks in the queue."); - return Vec::new() + return Vec::new(); } let is_major_syncing = self.status().state.is_major_syncing(); let attrs = self.required_block_attributes(); @@ -1550,7 +1573,7 @@ where !allowed_requests.contains(&id) || !disconnected_peers.is_peer_available(&id) { - return None + return None; } // If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from @@ -1648,17 +1671,17 @@ where /// Get a state request scheduled by sync to be sent out (if any). fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> { if self.allowed_requests.is_empty() { - return None + return None; } if self.state_sync.is_some() && self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) { // Only one pending state request is allowed. - return None + return None; } if let Some(sync) = &self.state_sync { if sync.is_complete() { - return None + return None; } for (id, peer) in self.peers.iter_mut() { @@ -1670,7 +1693,7 @@ where let request = sync.next_request(); trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request); self.allowed_requests.clear(); - return Some((*id, OpaqueStateRequest(Box::new(request)))) + return Some((*id, OpaqueStateRequest(Box::new(request)))); } } } @@ -1709,7 +1732,7 @@ where sync.import(*response) } else { debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}"); - return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)); }; match import_result { @@ -1765,16 +1788,17 @@ where } for (result, hash) in results { if has_error { - break + break; } has_error |= result.is_err(); match result { - Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => + Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => { if let Some(peer) = peer_id { self.update_peer_common_number(&peer, number); - }, + } + }, Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => { if aux.clear_justification_requests { trace!( @@ -1882,6 +1906,12 @@ where /// Get pending actions to perform. #[must_use] pub fn actions(&mut self) -> impl Iterator> { + if !self.peers.is_empty() && self.queue_blocks.is_empty() { + if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() { + self.attempt_state_sync(hash, number, skip_proofs); + } + } + let block_requests = self .block_requests() .into_iter() @@ -1964,7 +1994,7 @@ fn handle_ancestor_search_state( if block_hash_match && next_distance_to_tip == One::one() { // We found the ancestor in the first step so there is no need to execute binary // search. - return None + return None; } if block_hash_match { let left = curr_block_num; @@ -1983,7 +2013,7 @@ fn handle_ancestor_search_state( }, AncestorSearchState::BinarySearch(mut left, mut right) => { if left >= curr_block_num { - return None + return None; } if block_hash_match { left = curr_block_num; @@ -2014,7 +2044,7 @@ fn peer_block_request( ) -> Option<(Range>, BlockRequest)> { if best_num >= peer.best_number { // Will be downloaded as alternative fork instead. - return None + return None; } else if peer.common_number < finalized { trace!( target: LOG_TARGET, @@ -2103,7 +2133,7 @@ fn fork_sync_request( hash, r.number, ); - return false + return false; } if check_block(hash) != BlockStatus::Unknown { trace!( @@ -2112,7 +2142,7 @@ fn fork_sync_request( hash, r.number, ); - return false + return false; } true }); @@ -2121,7 +2151,7 @@ fn fork_sync_request( } for (hash, r) in fork_targets { if !r.peers.contains(&id) { - continue + continue; } // Download the fork only if it is behind or not too far ahead our tip of the chain // Otherwise it should be downloaded in full sync mode. @@ -2148,7 +2178,7 @@ fn fork_sync_request( direction: Direction::Descending, max: Some(count), }, - )) + )); } else { trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number); } @@ -2167,7 +2197,7 @@ where T: HeaderMetadata + ?Sized, { if base == block { - return Ok(false) + return Ok(false); } let ancestor = sp_blockchain::lowest_common_ancestor(client, *block, *base)?; @@ -2194,7 +2224,7 @@ pub fn validate_blocks( blocks.len(), ); - return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)); } let block_header = @@ -2214,7 +2244,7 @@ pub fn validate_blocks( block_header, ); - return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) + return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)); } if request.fields.contains(BlockAttributes::HEADER) && @@ -2225,7 +2255,7 @@ pub fn validate_blocks( "Missing requested header for a block in response from {peer_id}.", ); - return Err(BadPeer(*peer_id, rep::BAD_RESPONSE)) + return Err(BadPeer(*peer_id, rep::BAD_RESPONSE)); } if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none()) @@ -2235,7 +2265,7 @@ pub fn validate_blocks( "Missing requested body for a block in response from {peer_id}.", ); - return Err(BadPeer(*peer_id, rep::BAD_RESPONSE)) + return Err(BadPeer(*peer_id, rep::BAD_RESPONSE)); } } @@ -2250,7 +2280,7 @@ pub fn validate_blocks( b.hash, hash, ); - return Err(BadPeer(*peer_id, rep::BAD_BLOCK)) + return Err(BadPeer(*peer_id, rep::BAD_BLOCK)); } } if let (Some(header), Some(body)) = (&b.header, &b.body) { @@ -2268,7 +2298,7 @@ pub fn validate_blocks( expected, got, ); - return Err(BadPeer(*peer_id, rep::BAD_BLOCK)) + return Err(BadPeer(*peer_id, rep::BAD_BLOCK)); } } }