Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Migrate network to std futures
Browse files Browse the repository at this point in the history
  • Loading branch information
expenses committed Nov 29, 2019
1 parent 96f1a99 commit 322cca5
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 141 deletions.
2 changes: 1 addition & 1 deletion network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ sr-primitives = { git = "https://github.com/paritytech/substrate", branch = "pol
futures = "0.1"
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
log = "0.4.8"
exit-future = "0.1.4"
exit-future = { git = "https://github.com/expenses/exit-future", branch = "modernize" }
substrate-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }

Expand Down
12 changes: 6 additions & 6 deletions network/src/collator_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use codec::{Encode, Decode};
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{CollatorId, Id as ParaId, Collation};
use substrate_network::PeerId;
use futures::sync::oneshot;
use futures03::channel::oneshot;

use std::collections::hash_map::{HashMap, Entry};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -196,7 +196,7 @@ impl CollatorPool {
}

/// Wait for a collation from a parachain.
pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender<Collation>) {
pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: futures03::channel::oneshot::Sender<Collation>) {
self.collations.entry((relay_parent, para_id))
.or_insert_with(CollationSlot::blank_now)
.entries
Expand Down Expand Up @@ -230,7 +230,7 @@ mod tests {
use polkadot_primitives::parachain::{
CandidateReceipt, BlockData, PoVBlock, HeadData, ConsolidatedIngress,
};
use futures::Future;
use futures03::executor::block_on;

fn make_pov(block_data: Vec<u8>) -> PoVBlock {
PoVBlock {
Expand Down Expand Up @@ -292,8 +292,8 @@ mod tests {
pov: make_pov(vec![4, 5, 6]),
});

rx1.wait().unwrap();
rx2.wait().unwrap();
block_on(rx1).unwrap();
block_on(rx2).unwrap();
assert_eq!(pool.collators.get(&primary).map(|ids| &ids.1).unwrap(), &peer_id);
}

Expand Down Expand Up @@ -322,7 +322,7 @@ mod tests {

let (tx, rx) = oneshot::channel();
pool.await_collation(relay_parent, para_id, tx);
rx.wait().unwrap();
block_on(rx).unwrap();
}

#[test]
Expand Down
40 changes: 21 additions & 19 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ pub mod validation;
pub mod gossip;

use codec::{Decode, Encode};
use futures::sync::oneshot;
use futures::prelude::*;
use futures03::{channel::mpsc, compat::Compat, StreamExt};
use futures03::channel::mpsc;
use polkadot_primitives::{Block, Hash, Header};
use polkadot_primitives::parachain::{
Id as ParaId, BlockData, CollatorId, CandidateReceipt, Collation, PoVBlock,
Expand All @@ -47,6 +45,8 @@ use self::local_collations::LocalCollations;
use log::{trace, debug, warn};

use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::task::{Context as PollContext, Poll};

use crate::gossip::{POLKADOT_ENGINE_ID, GossipMessage};

Expand Down Expand Up @@ -109,7 +109,7 @@ impl NetworkService for PolkadotNetworkService {
Err(_) => mpsc::unbounded().1, // return empty channel.
};

GossipMessageStream::new(Box::new(Compat::new(topic_stream.map(Ok))))
GossipMessageStream::new(Box::new(topic_stream))
}

fn gossip_message(&self, topic: Hash, message: GossipMessage) {
Expand Down Expand Up @@ -152,32 +152,34 @@ impl GossipService for consensus_gossip::ConsensusGossip<Block> {

/// A stream of gossip messages and an optional sender for a topic.
pub struct GossipMessageStream {
topic_stream: Box<dyn Stream<Item = TopicNotification, Error = ()> + Send>,
topic_stream: Box<dyn futures03::Stream<Item = TopicNotification> + Unpin + Send>,
}

impl GossipMessageStream {
/// Create a new instance with the given topic stream.
pub fn new(topic_stream: Box<dyn Stream<Item = TopicNotification, Error = ()> + Send>) -> Self {
pub fn new(topic_stream: Box<dyn futures03::Stream<Item = TopicNotification> + Unpin + Send>) -> Self {
Self {
topic_stream,
}
}
}

impl Stream for GossipMessageStream {
impl futures03::Stream for GossipMessageStream {
type Item = (GossipMessage, Option<PeerId>);
type Error = ();

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll_next(self: Pin<&mut Self>, cx: &mut PollContext) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);

loop {
let msg = match futures::try_ready!(self.topic_stream.poll()) {
Some(msg) => msg,
None => return Ok(Async::Ready(None)),
let msg = match Pin::new(&mut this.topic_stream).poll_next(cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
};

debug!(target: "validation", "Processing statement for live validation leaf-work");
if let Ok(gmsg) = GossipMessage::decode(&mut &msg.message[..]) {
return Ok(Async::Ready(Some((gmsg, msg.sender))))
return Poll::Ready(Some((gmsg, msg.sender)))
}
}
}
Expand All @@ -194,7 +196,7 @@ struct PoVBlockRequest {
validation_leaf: Hash,
candidate_hash: Hash,
block_data_hash: Hash,
sender: oneshot::Sender<PoVBlock>,
sender: futures03::channel::oneshot::Sender<PoVBlock>,
canon_roots: StructuredUnroutedIngress,
}

Expand Down Expand Up @@ -331,8 +333,8 @@ impl PolkadotProtocol {
candidate: &CandidateReceipt,
relay_parent: Hash,
canon_roots: StructuredUnroutedIngress,
) -> oneshot::Receiver<PoVBlock> {
let (tx, rx) = oneshot::channel();
) -> futures03::channel::oneshot::Receiver<PoVBlock> {
let (tx, rx) = futures03::channel::oneshot::channel();

self.pending.push(PoVBlockRequest {
attempted_peers: Default::default(),
Expand Down Expand Up @@ -658,7 +660,7 @@ impl Specialization<Block> for PolkadotProtocol {
let retain = peer != &who;
if !retain {
// swap with a dummy value which will be dropped immediately.
let (sender, _) = oneshot::channel();
let (sender, _) = futures03::channel::oneshot::channel();
pending.push(::std::mem::replace(val, PoVBlockRequest {
attempted_peers: Default::default(),
validation_leaf: Default::default(),
Expand Down Expand Up @@ -753,8 +755,8 @@ impl PolkadotProtocol {
}
}

fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver<Collation> {
let (tx, rx) = oneshot::channel();
fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> futures03::channel::oneshot::Receiver<Collation> {
let (tx, rx) = futures03::channel::oneshot::channel();
debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent);
self.collators.await_collation(relay_parent, para_id, tx);
rx
Expand Down
40 changes: 26 additions & 14 deletions network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ use polkadot_primitives::parachain::{
OutgoingMessages, CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock,
};
use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement};

use futures::prelude::*;
use futures03::task::SpawnExt;
use futures03::TryFutureExt;
use futures03::FutureExt;
use parking_lot::Mutex;
use log::{debug, trace};

Expand All @@ -58,14 +59,16 @@ pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub(crate) fn checked_statements<N: NetworkService>(network: &N, topic: Hash) ->
impl Stream<Item=SignedStatement, Error=()> {
impl futures03::Stream<Item=SignedStatement> {
// spin up a task in the background that processes all incoming statements
// validation has been done already by the gossip validator.
// this will block internally until the gossip messages stream is obtained.
use futures03::StreamExt;

network.gossip_messages_for(topic)
.filter_map(|msg| match msg.0 {
GossipMessage::Statement(s) => Some(s.signed_statement),
_ => None
GossipMessage::Statement(s) => futures03::future::ready(Some(s.signed_statement)),
_ => futures03::future::ready(None)
})
}

Expand Down Expand Up @@ -100,7 +103,7 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement, Error=()> {
pub(crate) fn checked_statements(&self) -> impl futures03::Stream<Item=SignedStatement> {
checked_statements(&**self.network(), self.attestation_topic)
}

Expand Down Expand Up @@ -129,7 +132,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: Future<Item=(),Error=()> + Clone + Send + 'static,
E: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
{
/// Import a statement whose signature has been checked already.
pub(crate) fn import_statement(&self, statement: SignedStatement) {
Expand Down Expand Up @@ -173,17 +176,22 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w

if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) {
trace!(target: "validation", "driving statement work to completion");
let work = work.select2(self.fetcher.exit().clone()).then(|_| Ok(()));
self.fetcher.executor().spawn(work);

let work = futures03::future::select(
work,
self.fetcher.exit().clone()
)
.map(|_| ());
let _ = self.fetcher.executor().spawn(work);
}
}
}
}

fn create_work<D>(&self, candidate_hash: Hash, producer: ParachainWork<D>)
-> impl Future<Item=(),Error=()> + Send + 'static
-> impl futures03::Future<Output=()> + Send + 'static
where
D: Future<Item=PoVBlock,Error=io::Error> + Send + 'static,
D: futures03::Future<Output=Result<PoVBlock,io::Error>> + Send + Unpin + 'static,
{
let table = self.table.clone();
let network = self.network().clone();
Expand All @@ -192,7 +200,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
let parent_hash = self.parent_hash();

producer.prime(self.fetcher.api().clone())
.map(move |validated| {
.map_ok(move |validated| {
// store the data before broadcasting statements, so other peers can fetch.
knowledge.lock().note_candidate(
candidate_hash,
Expand All @@ -212,15 +220,19 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w

network.gossip_message(attestation_topic, statement.into());
})
.map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e))
.map(|res| {
if let Err(e) = res {
debug!(target: "p_net", "Failed to produce statements: {:?}", e);
}
})
}
}

impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> where
P::Api: ParachainHost<Block>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: Future<Item=(),Error=()> + Clone + Send + 'static,
E: futures03::Future<Output=()> + Clone + Send + 'static,
{
type Error = io::Error;
type FetchValidationProof = validation::PoVReceiver;
Expand Down
4 changes: 2 additions & 2 deletions network/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use substrate_network::{
specialization::NetworkSpecialization,
};

use futures::Future;
use futures03::executor::block_on;

mod validation;

Expand Down Expand Up @@ -244,7 +244,7 @@ fn fetches_from_those_with_knowledge() {
let pov_block = make_pov(block_data.0);
on_message(&mut protocol, &mut ctx, peer_b, Message::PovBlock(2, Some(pov_block.clone())));
drop(protocol);
assert_eq!(recv.wait().unwrap(), pov_block);
assert_eq!(block_on(recv).unwrap(), pov_block);
}
}

Expand Down
38 changes: 20 additions & 18 deletions network/src/tests/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,23 @@ use sr_primitives::traits::{ApiRef, ProvideRuntimeApi};

use std::collections::HashMap;
use std::sync::Arc;
use futures::{prelude::*, sync::mpsc};
use std::pin::Pin;
use std::task::{Poll, Context};
use futures03::{prelude::*, channel::mpsc};
use codec::Encode;

use super::{TestContext, TestChainContext};

type TaskExecutor = Arc<dyn futures::future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>;
type TaskExecutor = Arc<dyn futures03::task::Spawn + Send + Sync>;

#[derive(Clone, Copy)]
struct NeverExit;

impl Future for NeverExit {
type Item = ();
type Error = ();
type Output = ();

fn poll(&mut self) -> Poll<(), ()> {
Ok(Async::NotReady)
fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> {
Poll::Pending
}
}

Expand Down Expand Up @@ -93,27 +94,28 @@ impl GossipRouter {
}

impl Future for GossipRouter {
type Item = ();
type Error = ();
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);

fn poll(&mut self) -> Poll<(), ()> {
loop {
match self.incoming_messages.poll().unwrap() {
Async::Ready(Some((topic, message))) => self.add_message(topic, message),
Async::Ready(None) => panic!("ended early."),
Async::NotReady => break,
match Pin::new(&mut this.incoming_messages).poll_next(cx) {
Poll::Ready(Some((topic, message))) => this.add_message(topic, message),
Poll::Ready(None) => panic!("ended early."),
Poll::Pending => break,
}
}

loop {
match self.incoming_streams.poll().unwrap() {
Async::Ready(Some((topic, sender))) => self.add_outgoing(topic, sender),
Async::Ready(None) => panic!("ended early."),
Async::NotReady => break,
match Pin::new(&mut this.incoming_streams).poll_next(cx) {
Poll::Ready(Some((topic, sender))) => this.add_outgoing(topic, sender),
Poll::Ready(None) => panic!("ended early."),
Poll::Pending => break,
}
}

Ok(Async::NotReady)
Poll::Pending
}
}

Expand Down
Loading

0 comments on commit 322cca5

Please sign in to comment.