diff --git a/client/network/src/block_request_handler.rs b/client/network/src/block_request_handler.rs index 92f21f44f9d1c..b9b2535b36b32 100644 --- a/client/network/src/block_request_handler.rs +++ b/client/network/src/block_request_handler.rs @@ -44,7 +44,7 @@ pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig { name: generate_protocol_name(protocol_id).into(), max_request_size: 1024 * 1024, max_response_size: 16 * 1024 * 1024, - request_timeout: Duration::from_secs(40), + request_timeout: Duration::from_secs(20), inbound_queue: None, } } @@ -175,7 +175,7 @@ impl BlockRequestHandler { is_empty_justification, }; - total_size += block_data.body.len(); + total_size += block_data.body.iter().map(|ex| ex.len()).sum::(); blocks.push(block_data); if blocks.len() >= max_blocks as usize || total_size > MAX_BODY_BYTES { diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 4997bc36e53e7..4015e9e637766 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -754,7 +754,7 @@ impl Protocol { } else { None }, - receipt: if !block_data.message_queue.is_empty() { + receipt: if !block_data.receipt.is_empty() { Some(block_data.receipt) } else { None diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index 35f840152217f..9b5381f276e45 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -61,7 +61,7 @@ mod blocks; mod extra_requests; /// Maximum blocks to request in a single packet. -const MAX_BLOCKS_TO_REQUEST: usize = 128; +const MAX_BLOCKS_TO_REQUEST: usize = 64; /// Maximum blocks to store in the import queue. const MAX_IMPORTING_BLOCKS: usize = 2048; @@ -813,12 +813,14 @@ impl ChainSync { self.pending_requests.add(who); if let Some(request) = request { match &mut peer.state { - PeerSyncState::DownloadingNew(start_block) => { + PeerSyncState::DownloadingNew(_) => { self.blocks.clear_peer_download(who); - let start_block = *start_block; peer.state = PeerSyncState::Available; - validate_blocks::(&blocks, who, Some(request))?; - self.blocks.insert(start_block, blocks, who.clone()); + if let Some(start_block) = + validate_blocks::(&blocks, who, Some(request))? + { + self.blocks.insert(start_block, blocks, who.clone()); + } self.blocks .drain(self.best_queued_number + One::one()) .into_iter() @@ -1790,13 +1792,14 @@ fn is_descendent_of(client: &T, base: &Block::Hash, block: &Block::Has } /// Validate that the given `blocks` are correct. +/// Returns the number of the first block in the sequence. /// -/// It is expected that `blocks` are in asending order. +/// It is expected that `blocks` are in ascending order. fn validate_blocks( blocks: &Vec>, who: &PeerId, request: Option>, -) -> Result<(), BadPeer> { +) -> Result>, BadPeer> { if let Some(request) = request { if Some(blocks.len() as _) > request.max { debug!( @@ -1889,7 +1892,7 @@ fn validate_blocks( } } - Ok(()) + Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number())) } #[cfg(test)] diff --git a/client/network/src/protocol/sync/blocks.rs b/client/network/src/protocol/sync/blocks.rs index 60492f24ed8c3..e173fe83dac1c 100644 --- a/client/network/src/protocol/sync/blocks.rs +++ b/client/network/src/protocol/sync/blocks.rs @@ -181,7 +181,7 @@ impl BlockCollection { for r in ranges { self.blocks.remove(&r); } - trace!(target: "sync", "Drained {} blocks", drained.len()); + trace!(target: "sync", "Drained {} blocks from {:?}", drained.len(), from); drained } diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index 46fbb8f82d477..b88092b6e118a 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -935,3 +935,234 @@ fn continue_to_sync_after_some_block_announcement_verifications_failed() { net.block_until_sync(); assert!(net.peer(1).has_block(&block_hash)); } + +/// When being spammed by the same request of a peer, we ban this peer. However, we should only ban +/// this peer if the request was successful. In the case of a justification request for example, +/// we ask our peers multiple times until we got the requested justification. This test ensures that +/// asking for the same justification multiple times doesn't ban a peer. +#[test] +fn multiple_requests_are_accepted_as_long_as_they_are_not_fulfilled() { + sp_tracing::try_init_simple(); + let mut net = JustificationTestNet::new(2); + net.peer(0).push_blocks(10, false); + net.block_until_sync(); + + // there's currently no justification for block #10 + assert_eq!(net.peer(0).client().justifications(&BlockId::Number(10)).unwrap(), None); + assert_eq!(net.peer(1).client().justifications(&BlockId::Number(10)).unwrap(), None); + + let h1 = net.peer(1).client().header(&BlockId::Number(10)).unwrap().unwrap(); + + // Let's assume block 10 was finalized, but we still need the justification from the network. + net.peer(1).request_justification(&h1.hash().into(), 10); + + // Let's build some more blocks and wait always for the network to have synced them + for _ in 0..5 { + // We need to sleep 10 seconds as this is the time we wait between sending a new + // justification request. + std::thread::sleep(std::time::Duration::from_secs(10)); + net.peer(0).push_blocks(1, false); + net.block_until_sync(); + assert_eq!(1, net.peer(0).num_peers()); + } + + // Finalize the block and make the justification available. + net.peer(0) + .client() + .finalize_block(BlockId::Number(10), Some((*b"FRNK", Vec::new())), true) + .unwrap(); + + block_on(futures::future::poll_fn::<(), _>(|cx| { + net.poll(cx); + + if net.peer(1).client().justifications(&BlockId::Number(10)).unwrap() != + Some(Justifications::from((*b"FRNK", Vec::new()))) + { + return Poll::Pending + } + + Poll::Ready(()) + })); +} + +#[test] +fn syncs_all_forks_from_single_peer() { + sp_tracing::try_init_simple(); + let mut net = TestNet::new(2); + net.peer(0).push_blocks(10, false); + net.peer(1).push_blocks(10, false); + + // poll until the two nodes connect, otherwise announcing the block will not work + net.block_until_connected(); + + // Peer 0 produces new blocks and announces. + let branch1 = net.peer(0).push_blocks_at(BlockId::Number(10), 2, true); + + // Wait till peer 1 starts downloading + block_on(futures::future::poll_fn::<(), _>(|cx| { + net.poll(cx); + if net.peer(1).network().best_seen_block() != Some(12) { + return Poll::Pending + } + Poll::Ready(()) + })); + + // Peer 0 produces and announces another fork + let branch2 = net.peer(0).push_blocks_at(BlockId::Number(10), 2, false); + + net.block_until_sync(); + + // Peer 1 should have both branches, + assert!(net.peer(1).client().header(&BlockId::Hash(branch1)).unwrap().is_some()); + assert!(net.peer(1).client().header(&BlockId::Hash(branch2)).unwrap().is_some()); +} + +#[test] +fn syncs_after_missing_announcement() { + sp_tracing::try_init_simple(); + let mut net = TestNet::new(0); + net.add_full_peer_with_config(Default::default()); + // Set peer 1 to ignore announcement + net.add_full_peer_with_config(FullPeerConfig { + block_announce_validator: Some(Box::new(FailingBlockAnnounceValidator(11))), + ..Default::default() + }); + net.peer(0).push_blocks(10, false); + net.peer(1).push_blocks(10, false); + + net.block_until_connected(); + + // Peer 0 produces a new block and announces. Peer 1 ignores announcement. + net.peer(0).push_blocks_at(BlockId::Number(10), 1, false); + // Peer 0 produces another block and announces. + let final_block = net.peer(0).push_blocks_at(BlockId::Number(11), 1, false); + net.peer(1).push_blocks_at(BlockId::Number(10), 1, true); + net.block_until_sync(); + assert!(net.peer(1).client().header(&BlockId::Hash(final_block)).unwrap().is_some()); +} + +#[test] +fn syncs_state() { + sp_tracing::try_init_simple(); + for skip_proofs in &[false, true] { + let mut net = TestNet::new(0); + net.add_full_peer_with_config(Default::default()); + net.add_full_peer_with_config(FullPeerConfig { + sync_mode: SyncMode::Fast { skip_proofs: *skip_proofs, storage_chain_mode: false }, + ..Default::default() + }); + net.peer(0).push_blocks(64, false); + // Wait for peer 1 to sync header chain. + net.block_until_sync(); + assert!(!net.peer(1).client().has_state_at(&BlockId::Number(64))); + + let just = (*b"FRNK", Vec::new()); + net.peer(1) + .client() + .finalize_block(BlockId::Number(60), Some(just), true) + .unwrap(); + // Wait for state sync. + block_on(futures::future::poll_fn::<(), _>(|cx| { + net.poll(cx); + if net.peer(1).client.info().finalized_state.is_some() { + Poll::Ready(()) + } else { + Poll::Pending + } + })); + assert!(!net.peer(1).client().has_state_at(&BlockId::Number(64))); + // Wait for the rest of the states to be imported. + block_on(futures::future::poll_fn::<(), _>(|cx| { + net.poll(cx); + if net.peer(1).client().has_state_at(&BlockId::Number(64)) { + Poll::Ready(()) + } else { + Poll::Pending + } + })); + } +} + +#[test] +fn syncs_indexed_blocks() { + use sp_runtime::traits::Hash; + sp_tracing::try_init_simple(); + let mut net = TestNet::new(0); + let mut n: u64 = 0; + net.add_full_peer_with_config(FullPeerConfig { storage_chain: true, ..Default::default() }); + net.add_full_peer_with_config(FullPeerConfig { + storage_chain: true, + sync_mode: SyncMode::Fast { skip_proofs: false, storage_chain_mode: true }, + ..Default::default() + }); + net.peer(0).generate_blocks_at( + BlockId::number(0), + 64, + BlockOrigin::Own, + |mut builder| { + let ex = Extrinsic::Store(n.to_le_bytes().to_vec()); + n += 1; + builder.push(ex).unwrap(); + builder.build().unwrap().block + }, + false, + true, + true, + ); + let indexed_key = sp_runtime::traits::BlakeTwo256::hash(&42u64.to_le_bytes()); + assert!(net + .peer(0) + .client() + .as_full() + .unwrap() + .indexed_transaction(&indexed_key) + .unwrap() + .is_some()); + assert!(net + .peer(1) + .client() + .as_full() + .unwrap() + .indexed_transaction(&indexed_key) + .unwrap() + .is_none()); + + net.block_until_sync(); + assert!(net + .peer(1) + .client() + .as_full() + .unwrap() + .indexed_transaction(&indexed_key) + .unwrap() + .is_some()); +} + +#[test] +fn syncs_huge_blocks() { + use sp_core::storage::well_known_keys::HEAP_PAGES; + use sp_runtime::codec::Encode; + use substrate_test_runtime_client::BlockBuilderExt; + + sp_tracing::try_init_simple(); + let mut net = TestNet::new(2); + + // Increase heap space for bigger blocks. + net.peer(0).generate_blocks(1, BlockOrigin::Own, |mut builder| { + builder.push_storage_change(HEAP_PAGES.to_vec(), Some(256u64.encode())).unwrap(); + builder.build().unwrap().block + }); + + net.peer(0).generate_blocks(32, BlockOrigin::Own, |mut builder| { + // Add 32 extrinsics 32k each = 1MiB total + for _ in 0..32 { + let ex = Extrinsic::IncludeData([42u8; 32 * 1024].to_vec()); + builder.push(ex).unwrap(); + } + builder.build().unwrap().block + }); + + net.block_until_sync(); + assert_eq!(net.peer(0).client.info().best_number, 33); + assert_eq!(net.peer(1).client.info().best_number, 33); +}