Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Set reserved nodes with offchain worker. (#6996)
Browse files Browse the repository at this point in the history
* add offchain worker api to set reserved nodes.

* new offchain api to get node public key.

* node public key from converter

* refactor set reserved nodes ocw api.

* new ndoe authorization pallet

* remove unnecessary clone and more.

* more

* tests for node authorization pallet

* remove dependency

* fix build

* more tests.

* refactor

* Update primitives/core/src/offchain/testing.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Update frame/node-authorization/src/lib.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Update frame/node-authorization/src/lib.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* Update frame/node-authorization/src/lib.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* format code

* expose NetworkService

* remove NetworkStateInfo in offchain

* replace NodePublicKey with PeerId.

* set max length of peer id.

* clear more

* use BTreeSet for set of peers.

* decode opaque peer id.

* extract NetworkProvider for client offchain.

* use OpaquePeerId in node authorization pallet.

* fix test

* better documentation

* fix test

* doc

* more fix

* Update primitives/core/src/offchain/mod.rs

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>

* Update client/offchain/src/api.rs

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>

* derive serialize and deserialize

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
  • Loading branch information
3 people authored Sep 10, 2020
1 parent d204ebe commit 8646de9
Show file tree
Hide file tree
Showing 15 changed files with 1,080 additions and 44 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ members = [
"frame/metadata",
"frame/multisig",
"frame/nicks",
"frame/node-authorization",
"frame/offences",
"frame/proxy",
"frame/randomness-collective-flip",
Expand Down
18 changes: 17 additions & 1 deletion client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//!
//! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`].
//! The [`NetworkWorker`] *is* the network and implements the `Future` trait. It must be polled in
//! order fo the network to advance.
//! order for the network to advance.
//! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an
//! `Arc<NetworkService>` by calling [`NetworkWorker::service`].
//!
Expand Down Expand Up @@ -605,6 +605,22 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
&self.local_peer_id
}

/// Set authorized peers.
///
/// Need a better solution to manage authorized peers, but now just use reserved peers for
/// prototyping.
pub fn set_authorized_peers(&self, peers: HashSet<PeerId>) {
self.peerset.set_reserved_peers(peers)
}

/// Set authorized_only flag.
///
/// Need a better solution to decide authorized_only, but now just use reserved_only flag for
/// prototyping.
pub fn set_authorized_only(&self, reserved_only: bool) {
self.peerset.set_reserved_only(reserved_only)
}

/// Appends a notification to the buffer of pending outgoing notifications with the given peer.
/// Has no effect if the notifications channel with this protocol name is not open.
///
Expand Down
50 changes: 35 additions & 15 deletions client/offchain/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ use std::{
sync::Arc,
convert::TryFrom,
thread::sleep,
collections::HashSet,
};

use sp_core::offchain::OffchainStorage;
use crate::NetworkProvider;
use futures::Future;
use log::error;
use sc_network::{PeerId, Multiaddr, NetworkStateInfo};
use sc_network::{PeerId, Multiaddr};
use codec::{Encode, Decode};
use sp_core::OpaquePeerId;
use sp_core::offchain::{
Externalities as OffchainExt, HttpRequestId, Timestamp, HttpRequestStatus, HttpError,
OpaqueNetworkState, OpaquePeerId, OpaqueMultiaddr, StorageKind,
OffchainStorage, OpaqueNetworkState, OpaqueMultiaddr, StorageKind,
};
pub use sp_offchain::STORAGE_PREFIX;
pub use http::SharedClient;
Expand All @@ -49,8 +51,8 @@ mod timestamp;
pub(crate) struct Api<Storage> {
/// Offchain Workers database.
db: Storage,
/// A NetworkState provider.
network_state: Arc<dyn NetworkStateInfo + Send + Sync>,
/// A provider for substrate networking.
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
/// Is this node a potential validator?
is_validator: bool,
/// Everything HTTP-related is handled by a different struct.
Expand All @@ -73,10 +75,10 @@ impl<Storage: OffchainStorage> OffchainExt for Api<Storage> {
}

fn network_state(&self) -> Result<OpaqueNetworkState, ()> {
let external_addresses = self.network_state.external_addresses();
let external_addresses = self.network_provider.external_addresses();

let state = NetworkState::new(
self.network_state.local_peer_id(),
self.network_provider.local_peer_id(),
external_addresses,
);
Ok(OpaqueNetworkState::from(state))
Expand Down Expand Up @@ -180,6 +182,15 @@ impl<Storage: OffchainStorage> OffchainExt for Api<Storage> {
) -> Result<usize, HttpError> {
self.http.response_read_body(request_id, buffer, deadline)
}

fn set_authorized_nodes(&mut self, nodes: Vec<OpaquePeerId>, authorized_only: bool) {
let peer_ids: HashSet<PeerId> = nodes.into_iter()
.filter_map(|node| PeerId::from_bytes(node.0).ok())
.collect();

self.network_provider.set_authorized_peers(peer_ids);
self.network_provider.set_authorized_only(authorized_only);
}
}

/// Information about the local node's network state.
Expand Down Expand Up @@ -256,18 +267,18 @@ pub(crate) struct AsyncApi {
}

impl AsyncApi {
/// Creates new Offchain extensions API implementation an the asynchronous processing part.
/// Creates new Offchain extensions API implementation an the asynchronous processing part.
pub fn new<S: OffchainStorage>(
db: S,
network_state: Arc<dyn NetworkStateInfo + Send + Sync>,
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
is_validator: bool,
shared_client: SharedClient,
) -> (Api<S>, Self) {
let (http_api, http_worker) = http::http(shared_client);

let api = Api {
db,
network_state,
network_provider,
is_validator,
http: http_api,
};
Expand All @@ -292,11 +303,21 @@ mod tests {
use super::*;
use std::{convert::{TryFrom, TryInto}, time::SystemTime};
use sc_client_db::offchain::LocalStorage;
use sc_network::PeerId;
use sc_network::{NetworkStateInfo, PeerId};

struct MockNetworkStateInfo();
struct TestNetwork();

impl NetworkProvider for TestNetwork {
fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
unimplemented!()
}

impl NetworkStateInfo for MockNetworkStateInfo {
fn set_authorized_only(&self, _reserved_only: bool) {
unimplemented!()
}
}

impl NetworkStateInfo for TestNetwork {
fn external_addresses(&self) -> Vec<Multiaddr> {
Vec::new()
}
Expand All @@ -309,10 +330,9 @@ mod tests {
fn offchain_api() -> (Api<LocalStorage>, AsyncApi) {
let _ = env_logger::try_init();
let db = LocalStorage::new_test();
let mock = Arc::new(MockNetworkStateInfo());
let mock = Arc::new(TestNetwork());
let shared_client = SharedClient::new();


AsyncApi::new(
db,
mock,
Expand Down
59 changes: 49 additions & 10 deletions client/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@

#![warn(missing_docs)]

use std::{fmt, marker::PhantomData, sync::Arc};
use std::{
fmt, marker::PhantomData, sync::Arc,
collections::HashSet,
};

use parking_lot::Mutex;
use threadpool::ThreadPool;
use sp_api::{ApiExt, ProvideRuntimeApi};
use futures::future::Future;
use log::{debug, warn};
use sc_network::NetworkStateInfo;
use sc_network::{ExHashT, NetworkService, NetworkStateInfo, PeerId};
use sp_core::{offchain::{self, OffchainStorage}, ExecutionContext, traits::SpawnNamed};
use sp_runtime::{generic::BlockId, traits::{self, Header}};
use futures::{prelude::*, future::ready};
Expand All @@ -50,6 +53,30 @@ use api::SharedClient;

pub use sp_offchain::{OffchainWorkerApi, STORAGE_PREFIX};

/// NetworkProvider provides [`OffchainWorkers`] with all necessary hooks into the
/// underlying Substrate networking.
pub trait NetworkProvider: NetworkStateInfo {
/// Set the authorized peers.
fn set_authorized_peers(&self, peers: HashSet<PeerId>);

/// Set the authorized only flag.
fn set_authorized_only(&self, reserved_only: bool);
}

impl<B, H> NetworkProvider for NetworkService<B, H>
where
B: traits::Block + 'static,
H: ExHashT,
{
fn set_authorized_peers(&self, peers: HashSet<PeerId>) {
self.set_authorized_peers(peers)
}

fn set_authorized_only(&self, reserved_only: bool) {
self.set_authorized_only(reserved_only)
}
}

/// An offchain workers manager.
pub struct OffchainWorkers<Client, Storage, Block: traits::Block> {
client: Arc<Client>,
Expand Down Expand Up @@ -98,7 +125,7 @@ impl<Client, Storage, Block> OffchainWorkers<
pub fn on_block_imported(
&self,
header: &Block::Header,
network_state: Arc<dyn NetworkStateInfo + Send + Sync>,
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
is_validator: bool,
) -> impl Future<Output = ()> {
let runtime = self.client.runtime_api();
Expand All @@ -122,7 +149,7 @@ impl<Client, Storage, Block> OffchainWorkers<
if version > 0 {
let (api, runner) = api::AsyncApi::new(
self.db.clone(),
network_state.clone(),
network_provider,
is_validator,
self.shared_client.clone(),
);
Expand Down Expand Up @@ -173,7 +200,7 @@ pub async fn notification_future<Client, Storage, Block, Spawner>(
client: Arc<Client>,
offchain: Arc<OffchainWorkers<Client, Storage, Block>>,
spawner: Spawner,
network_state_info: Arc<dyn NetworkStateInfo + Send + Sync>,
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
)
where
Block: traits::Block,
Expand All @@ -188,7 +215,7 @@ pub async fn notification_future<Client, Storage, Block, Spawner>(
"offchain-on-block",
offchain.on_block_imported(
&n.header,
network_state_info.clone(),
network_provider.clone(),
is_validator,
).boxed(),
);
Expand All @@ -213,9 +240,9 @@ mod tests {
use sc_transaction_pool::{BasicPool, FullChainApi};
use sp_transaction_pool::{TransactionPool, InPoolTransaction};

struct MockNetworkStateInfo();
struct TestNetwork();

impl NetworkStateInfo for MockNetworkStateInfo {
impl NetworkStateInfo for TestNetwork {
fn external_addresses(&self) -> Vec<Multiaddr> {
Vec::new()
}
Expand All @@ -225,6 +252,16 @@ mod tests {
}
}

impl NetworkProvider for TestNetwork {
fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
unimplemented!()
}

fn set_authorized_only(&self, _reserved_only: bool) {
unimplemented!()
}
}

struct TestPool(
Arc<BasicPool<FullChainApi<TestClient, Block>, Block>>
);
Expand Down Expand Up @@ -255,12 +292,14 @@ mod tests {
client.clone(),
));
let db = sc_client_db::offchain::LocalStorage::new_test();
let network_state = Arc::new(MockNetworkStateInfo());
let network = Arc::new(TestNetwork());
let header = client.header(&BlockId::number(0)).unwrap().unwrap();

// when
let offchain = OffchainWorkers::new(client, db);
futures::executor::block_on(offchain.on_block_imported(&header, network_state, false));
futures::executor::block_on(
offchain.on_block_imported(&header, network, false)
);

// then
assert_eq!(pool.0.status().ready, 1);
Expand Down
12 changes: 12 additions & 0 deletions client/peerset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const FORGET_AFTER: Duration = Duration::from_secs(3600);
enum Action {
AddReservedPeer(PeerId),
RemoveReservedPeer(PeerId),
SetReservedPeers(HashSet<PeerId>),
SetReservedOnly(bool),
ReportPeer(PeerId, ReputationChange),
SetPriorityGroup(String, HashSet<PeerId>),
Expand Down Expand Up @@ -102,6 +103,11 @@ impl PeersetHandle {
pub fn set_reserved_only(&self, reserved: bool) {
let _ = self.tx.unbounded_send(Action::SetReservedOnly(reserved));
}

/// Set reserved peers to the new set.
pub fn set_reserved_peers(&self, peer_ids: HashSet<PeerId>) {
let _ = self.tx.unbounded_send(Action::SetReservedPeers(peer_ids));
}

/// Reports an adjustment to the reputation of the given peer.
pub fn report_peer(&self, peer_id: PeerId, score_diff: ReputationChange) {
Expand Down Expand Up @@ -246,6 +252,10 @@ impl Peerset {
fn on_remove_reserved_peer(&mut self, peer_id: PeerId) {
self.on_remove_from_priority_group(RESERVED_NODES, peer_id);
}

fn on_set_reserved_peers(&mut self, peer_ids: HashSet<PeerId>) {
self.on_set_priority_group(RESERVED_NODES, peer_ids);
}

fn on_set_reserved_only(&mut self, reserved_only: bool) {
self.reserved_only = reserved_only;
Expand Down Expand Up @@ -655,6 +665,8 @@ impl Stream for Peerset {
self.on_add_reserved_peer(peer_id),
Action::RemoveReservedPeer(peer_id) =>
self.on_remove_reserved_peer(peer_id),
Action::SetReservedPeers(peer_ids) =>
self.on_set_reserved_peers(peer_ids),
Action::SetReservedOnly(reserved) =>
self.on_set_reserved_only(reserved),
Action::ReportPeer(peer_id, score_diff) =>
Expand Down
2 changes: 1 addition & 1 deletion client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ pub fn build_offchain_workers<TBl, TBackend, TCl>(
client.clone(),
offchain,
Clone::clone(&spawn_handle),
network.clone()
network.clone(),
)
);
}
Expand Down
3 changes: 2 additions & 1 deletion frame/im-online/src/benchmarking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use super::*;

use frame_system::RawOrigin;
use frame_benchmarking::benchmarks;
use sp_core::offchain::{OpaquePeerId, OpaqueMultiaddr};
use sp_core::OpaquePeerId;
use sp_core::offchain::OpaqueMultiaddr;
use sp_runtime::traits::{ValidateUnsigned, Zero};
use sp_runtime::transaction_validity::TransactionSource;
use frame_support::traits::UnfilteredDispatchable;
Expand Down
2 changes: 1 addition & 1 deletion frame/im-online/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

use super::*;
use crate::mock::*;
use sp_core::OpaquePeerId;
use sp_core::offchain::{
OpaquePeerId,
OffchainExt,
TransactionPoolExt,
testing::{TestOffchainExt, TestTransactionPoolExt},
Expand Down
Loading

0 comments on commit 8646de9

Please sign in to comment.