Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Simplify trait bounds in network to prepare for collator-rpc #12082

Merged
merged 37 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ceb706e
Hack towards PoC
skunert Mar 15, 2022
b2e5b5d
Abstract away runtime requirement
skunert Mar 15, 2022
97264ff
blockchainevents
skunert Mar 24, 2022
21ead6f
Remove bitswap
skunert Apr 5, 2022
bacd2b4
Merge branch 'master' into collator-rpc-poc
Apr 8, 2022
e74cc52
Remove unused sync more
skunert Apr 25, 2022
22ec4b9
Merge branch 'cumulus-2022-04-25' into collator-rpc-poc
skunert Apr 25, 2022
5ef24b8
Merge branch 'cumulus-2022-05-11' into collator-rpc-poc
skunert May 11, 2022
b0a149e
Remove unused features in network
skunert May 27, 2022
a1b1244
Re-enable bitswap change
skunert May 27, 2022
ebd73bd
Merge branch 'cumulus-2022-05-30' into collator-rpc-poc
skunert May 30, 2022
a2f2c60
Remove `Chain` trait bound
skunert Jun 1, 2022
44da032
Reimplement blockchain-rpc-events
skunert Jun 20, 2022
acf4446
Merge branch 'cumulus-2022-06-20' into collator-rpc-poc
skunert Jun 20, 2022
63581d0
Move network to cumulus
skunert Jun 22, 2022
ab192df
Make AuthorityDiscovery async
skunert Jun 23, 2022
9aae7e0
Merge branch 'cumulus-2022-07-11' into collator-rpc-poc
skunert Jul 11, 2022
f52fa93
Merge branch 'cumulus-2022-07-26' into collator-rpc-poc
skunert Jul 26, 2022
8b7820a
Merge branch 'cumulus-2022-08-03' into collator-rpc-poc
skunert Aug 3, 2022
9782ad4
Remove `ProofProvider` requirement from network behaviour
skunert Aug 5, 2022
f02bd87
Extract bitswap
skunert Aug 8, 2022
1ee5965
Merge branch 'cumulus-2022-08-11' into collator-rpc-poc
skunert Aug 11, 2022
6e0645c
Adjustments after merge
skunert Aug 11, 2022
ddc8ef9
Remove HeaderMetadata trait from network
skunert Aug 17, 2022
054c327
Introduce NetworkHeaderBackend
skunert Aug 18, 2022
a7213c2
Add comments
skunert Aug 19, 2022
e040f2f
Merge commit 'b7d2cb5c2dbfbc49cb3c06f6198daa60b04f16c0' into collator…
skunert Aug 22, 2022
7165e1a
Improve comments
skunert Aug 22, 2022
25067f6
Move NetworkHeaderBackend to new module
skunert Aug 22, 2022
073e40f
Improve naming, remove redundand send + sync
skunert Aug 22, 2022
a240068
Clean up generics
skunert Aug 22, 2022
25578ec
Fix CI
skunert Aug 23, 2022
f3663f3
Merge branch 'master' into collator-rpc-poc
skunert Aug 24, 2022
0459dd1
Improve comment and readability
skunert Aug 25, 2022
f0f56ec
Remove NetworkHeaderBackend
skunert Aug 26, 2022
0659230
Merge branch 'master' into collator-rpc-poc
skunert Aug 29, 2022
b4b49a6
Fix Cargo.lock
skunert Aug 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
blockchainevents
  • Loading branch information
skunert committed Mar 24, 2022
commit 97264ff74610b2bcb2a931b6b505cf8368701bce
15 changes: 11 additions & 4 deletions client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@

//! A set of APIs supported by the client along with their primitives.

use crate::{blockchain::Info, notifications::StorageEventStream, FinalizeSummary, ImportSummary};
use futures::Stream;
use sp_consensus::BlockOrigin;
use sp_core::storage::StorageKey;
use sp_runtime::{
generic::{BlockId, SignedBlock},
generic::{Block, BlockId, SignedBlock},
traits::{Block as BlockT, NumberFor},
Justifications,
};
use std::{collections::HashSet, convert::TryFrom, fmt, sync::Arc};

use crate::{blockchain::Info, notifications::StorageEventStream, FinalizeSummary, ImportSummary};
use std::{collections::HashSet, convert::TryFrom, fmt, pin::Pin, sync::Arc};

use sc_transaction_pool_api::ChainEvent;
use sc_utils::mpsc::TracingUnboundedReceiver;
Expand Down Expand Up @@ -57,6 +57,13 @@ pub trait BlockOf {
type Type: BlockT;
}

pub trait BlockchainRPCEvents<Block: BlockT> {
fn finality_notification_stream(&self) -> Pin<Box<dyn Stream<Item = Block::Header> + Send>>;

fn import_notification_stream(&self) -> Pin<Box<dyn Stream<Item = Block::Header> + Send>>;

fn best_head_stream(&self) -> Pin<Box<dyn Stream<Item = Block::Header> + Send>>;
}
/// A source of blockchain events.
pub trait BlockchainEvents<Block: BlockT> {
/// Get block import event stream. Not guaranteed to be fired for every
Expand Down
12 changes: 6 additions & 6 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{
build_network_future,
build_network_collator_future, build_network_future,
client::{Client, ClientConfig},
config::{Configuration, KeystoreConfig, PrometheusConfig, TransactionStorageMode},
error::Error,
Expand All @@ -31,7 +31,8 @@ use prometheus_endpoint::Registry;
use sc_chain_spec::get_extension;
use sc_client_api::{
execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, StorageProvider, UsageProvider,
BlockBackend, BlockchainEvents, BlockchainRPCEvents, ExecutorProvider, ForkBlocks,
StorageProvider, UsageProvider,
};
use sc_client_db::{Backend, DatabaseSettings};
use sc_consensus::import_queue::ImportQueue;
Expand Down Expand Up @@ -745,7 +746,7 @@ pub struct BuildCollatorNetworkParams<'a, TImpQu, TCl> {
}

/// Build the network service, the network status sinks and an RPC sender.
pub fn build_collator_network<TBl, TExPool, TImpQu, TCl>(
pub fn build_collator_network<TBl, TImpQu, TCl>(
params: BuildCollatorNetworkParams<TImpQu, TCl>,
) -> Result<
(
Expand All @@ -763,9 +764,8 @@ where
+ BlockIdTo<TBl, Error = sp_blockchain::Error>
+ ProofProvider<TBl>
+ HeaderBackend<TBl>
+ BlockchainEvents<TBl>
+ BlockchainRPCEvents<TBl>
+ 'static,
TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
TImpQu: ImportQueue<TBl> + 'static,
{
let BuildCollatorNetworkParams { config, client, spawn_handle, import_queue } = params;
Expand Down Expand Up @@ -873,7 +873,7 @@ where

let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc");

let future = build_network_future(
let future = build_network_collator_future(
config.role.clone(),
network_mut,
client,
Expand Down
153 changes: 152 additions & 1 deletion client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub use sc_chain_spec::{
ChainSpec, ChainType, Extension as ChainSpecExtension, GenericChainSpec, NoExtension,
Properties, RuntimeGenesis,
};
use sc_client_api::{blockchain::HeaderBackend, BlockchainEvents};
use sc_client_api::{blockchain::HeaderBackend, BlockchainEvents, BlockchainRPCEvents};
pub use sc_consensus::ImportQueue;
pub use sc_executor::NativeExecutionDispatch;
#[doc(hidden)]
Expand Down Expand Up @@ -273,6 +273,157 @@ async fn build_network_future<
}
}

/// Builds a never-ending future that continuously polls the network.
///
/// The `status_sink` contain a list of senders to send a periodic network status to.
async fn build_network_collator_future<
B: BlockT,
C: BlockchainRPCEvents<B> + HeaderBackend<B>,
H: sc_network::ExHashT,
>(
role: Role,
mut network: sc_network::NetworkWorker<B, H>,
client: Arc<C>,
mut rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<B>>,
should_have_peers: bool,
announce_imported_blocks: bool,
) {
let mut imported_blocks_stream = client.import_notification_stream().fuse();

// Current best block at initialization, to report to the RPC layer.
let starting_block = client.info().best_number;

// Stream of finalized blocks reported by the client.
let mut finality_notification_stream = client.finality_notification_stream().fuse();
let mut new_best_notification_stream = client.best_head_stream().fuse();

loop {
futures::select! {
// List of blocks that the client has imported.
notification = imported_blocks_stream.next() => {
let notification = match notification {
Some(n) => n,
// If this stream is shut down, that means the client has shut down, and the
// most appropriate thing to do for the network future is to shut down too.
None => return,
};

// TODO skunert No need to announce anything
if announce_imported_blocks {
network.service().announce_block(notification.hash(), None);
}
}

notification = new_best_notification_stream.next() => {
let notification = match notification {
Some(n) => n,
// If this stream is shut down, that means the client has shut down, and the
// most appropriate thing to do for the network future is to shut down too.
None => return,
};

network.service().new_best_block_imported(
notification.hash(),
notification.number().clone(),
);
}

// List of blocks that the client has finalized.
notification = finality_notification_stream.select_next_some() => {
network.on_block_finalized(notification.hash(), notification);
}

// Answer incoming RPC requests.
request = rpc_rx.select_next_some() => {
match request {
sc_rpc::system::Request::Health(sender) => {
let _ = sender.send(sc_rpc::system::Health {
peers: network.peers_debug_info().len(),
is_syncing: network.service().is_major_syncing(),
should_have_peers,
});
},
sc_rpc::system::Request::LocalPeerId(sender) => {
let _ = sender.send(network.local_peer_id().to_base58());
},
sc_rpc::system::Request::LocalListenAddresses(sender) => {
let peer_id = network.local_peer_id().clone().into();
let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id);
let addresses = network.listen_addresses()
.map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string())
.collect();
let _ = sender.send(addresses);
},
sc_rpc::system::Request::Peers(sender) => {
let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)|
sc_rpc::system::PeerInfo {
peer_id: peer_id.to_base58(),
roles: format!("{:?}", p.roles),
best_hash: p.best_hash,
best_number: p.best_number,
}
).collect());
}
sc_rpc::system::Request::NetworkState(sender) => {
if let Some(network_state) = serde_json::to_value(&network.network_state()).ok() {
let _ = sender.send(network_state);
}
}
sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => {
let x = network.add_reserved_peer(peer_addr)
.map_err(sc_rpc::system::error::Error::MalformattedPeerArg);
let _ = sender.send(x);
}
sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => {
let _ = match peer_id.parse::<PeerId>() {
Ok(peer_id) => {
network.remove_reserved_peer(peer_id);
sender.send(Ok(()))
}
Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg(
e.to_string(),
))),
};
}
sc_rpc::system::Request::NetworkReservedPeers(sender) => {
let reserved_peers = network.reserved_peers();
let reserved_peers = reserved_peers
.map(|peer_id| peer_id.to_base58())
.collect();

let _ = sender.send(reserved_peers);
}
sc_rpc::system::Request::NodeRoles(sender) => {
use sc_rpc::system::NodeRole;

let node_role = match role {
Role::Authority { .. } => NodeRole::Authority,
Role::Light => NodeRole::LightClient,
Role::Full => NodeRole::Full,
};

let _ = sender.send(vec![node_role]);
}
sc_rpc::system::Request::SyncState(sender) => {
use sc_rpc::system::SyncState;

let _ = sender.send(SyncState {
starting_block: starting_block,
current_block: client.info().best_number,
highest_block: network.best_seen_block(),
});
}
}
}

// The network worker has done something. Nothing special to do, but could be
// used in the future to perform actions in response of things that happened on
// the network.
_ = (&mut network).fuse() => {}
}
}
}

// Wrapper for HTTP and WS servers that makes sure they are properly shut down.
mod waiting {
pub struct HttpServer(pub Option<sc_rpc_server::HttpServer>);
Expand Down