Skip to content

[WIP] Follow latest pinning API #1266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ members = [
"futures-util",
"futures-test",
]

[patch.crates-io]
pin-utils = { git = "https://github.com/Kroisse/pin-utils.git", branch = "pin" }
16 changes: 9 additions & 7 deletions futures-channel/benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
#![feature(test, futures_api, pin, arbitrary_self_types)]

extern crate test;

use futures::ready;
use futures::channel::mpsc::{self, Sender, UnboundedSender};
use futures::executor::LocalPool;
use futures::stream::{Stream, StreamExt};
use futures::sink::Sink;
use futures::task::{self, Poll, Wake, LocalWaker};
use std::pin::PinMut;
use std::pin::Pin;
use std::sync::Arc;
use test::Bencher;
use self::test::Bencher;

fn notify_noop() -> LocalWaker {
struct Noop;
Expand Down Expand Up @@ -100,16 +102,16 @@ struct TestSender {
impl Stream for TestSender {
type Item = u32;

fn poll_next(mut self: PinMut<Self>, cx: &mut task::Context)
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context)
-> Poll<Option<Self::Item>>
{
let this = &mut *self;
let mut tx = PinMut::new(&mut this.tx);
let mut tx = Pin::new(&mut this.tx);

ready!(tx.reborrow().poll_ready(cx)).unwrap();
tx.reborrow().start_send(this.last + 1).unwrap();
ready!(tx.as_mut().poll_ready(cx)).unwrap();
tx.as_mut().start_send(this.last + 1).unwrap();
this.last += 1;
assert_eq!(Poll::Ready(Ok(())), tx.reborrow().poll_flush(cx));
assert_eq!(Poll::Ready(Ok(())), tx.as_mut().poll_flush(cx));
Poll::Ready(Some(this.last))
}
}
Expand Down
14 changes: 7 additions & 7 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ use std::any::Any;
use std::error::Error;
use std::fmt;
use std::marker::Unpin;
use std::pin::PinMut;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
Expand Down Expand Up @@ -113,7 +113,7 @@ pub struct Sender<T> {
maybe_parked: bool,
}

// We never project PinMut<Sender> to `PinMut<T>`
// We never project Pin<&mut Sender> to `Pin<&mut T>`
impl<T> Unpin for Sender<T> {}

/// The transmission end of an unbounded mpsc channel.
Expand All @@ -139,7 +139,7 @@ pub struct Receiver<T> {
#[derive(Debug)]
pub struct UnboundedReceiver<T>(Receiver<T>);

// `PinMut<UnboundedReceiver<T>>` is never projected to `PinMut<T>`
// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
impl<T> Unpin for UnboundedReceiver<T> {}

/// The error type for [`Sender`s](Sender) used as `Sink`s.
Expand Down Expand Up @@ -953,14 +953,14 @@ impl<T> Receiver<T> {
}
}

// The receiver does not ever take a PinMut to the inner T
// The receiver does not ever take a Pin to the inner T
impl<T> Unpin for Receiver<T> {}

impl<T> Stream for Receiver<T> {
type Item = T;

fn poll_next(
mut self: PinMut<Self>,
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<T>> {
loop {
Expand Down Expand Up @@ -1030,10 +1030,10 @@ impl<T> Stream for UnboundedReceiver<T> {
type Item = T;

fn poll_next(
mut self: PinMut<Self>,
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<T>> {
PinMut::new(&mut self.0).poll_next(cx)
Pin::new(&mut self.0).poll_next(cx)
}
}

Expand Down
6 changes: 3 additions & 3 deletions futures-channel/src/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use futures_core::future::Future;
use futures_core::task::{self, Poll, Waker};
use std::marker::Unpin;
use std::pin::PinMut;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
Expand All @@ -29,7 +29,7 @@ pub struct Sender<T> {
inner: Arc<Inner<T>>,
}

// The channels do not ever project PinMut to the inner T
// The channels do not ever project Pin to the inner T
impl<T> Unpin for Receiver<T> {}
impl<T> Unpin for Sender<T> {}

Expand Down Expand Up @@ -419,7 +419,7 @@ impl<T> Future for Receiver<T> {
type Output = Result<T, Canceled>;

fn poll(
self: PinMut<Self>,
self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Result<T, Canceled>> {
self.inner.recv(cx)
Expand Down
26 changes: 13 additions & 13 deletions futures-channel/tests/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,30 @@ fn send_recv_no_buffer() {
let (tx, rx) = mpsc::channel::<i32>(0);
pin_mut!(tx, rx);

assert!(tx.reborrow().poll_flush(cx).is_ready());
assert!(tx.reborrow().poll_ready(cx).is_ready());
assert!(tx.as_mut().poll_flush(cx).is_ready());
assert!(tx.as_mut().poll_ready(cx).is_ready());

// Send first message
assert!(tx.reborrow().start_send(1).is_ok());
assert!(tx.reborrow().poll_ready(cx).is_pending());
assert!(tx.as_mut().start_send(1).is_ok());
assert!(tx.as_mut().poll_ready(cx).is_pending());

// poll_ready said Pending, so no room in buffer, therefore new sends
// should get rejected with is_full.
assert!(tx.reborrow().start_send(0).unwrap_err().is_full());
assert!(tx.reborrow().poll_ready(cx).is_pending());
assert!(tx.as_mut().start_send(0).unwrap_err().is_full());
assert!(tx.as_mut().poll_ready(cx).is_pending());

// Take the value
assert_eq!(rx.reborrow().poll_next(cx), Poll::Ready(Some(1)));
assert!(tx.reborrow().poll_ready(cx).is_ready());
assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1)));
assert!(tx.as_mut().poll_ready(cx).is_ready());

// Send second message
assert!(tx.reborrow().poll_ready(cx).is_ready());
assert!(tx.reborrow().start_send(2).is_ok());
assert!(tx.reborrow().poll_ready(cx).is_pending());
assert!(tx.as_mut().poll_ready(cx).is_ready());
assert!(tx.as_mut().start_send(2).is_ok());
assert!(tx.as_mut().poll_ready(cx).is_pending());

// Take the value
assert_eq!(rx.reborrow().poll_next(cx), Poll::Ready(Some(2)));
assert!(tx.reborrow().poll_ready(cx).is_ready());
assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2)));
assert!(tx.as_mut().poll_ready(cx).is_ready());

Poll::Ready(())
}));
Expand Down
4 changes: 2 additions & 2 deletions futures-channel/tests/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::channel::oneshot::{self, Sender};
use futures::executor::block_on;
use futures::future::{Future, FutureExt, poll_fn};
use futures::task::{self, Poll};
use std::pin::PinMut;
use std::pin::Pin;
use std::sync::mpsc;
use std::thread;

Expand Down Expand Up @@ -42,7 +42,7 @@ struct WaitForCancel {
impl Future for WaitForCancel {
type Output = ();

fn poll(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
self.tx.poll_cancel(cx)
}
}
Expand Down
6 changes: 3 additions & 3 deletions futures-core/src/future.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Futures.

use crate::task::{self, Poll};
use core::pin::PinMut;
use core::pin::Pin;

pub use core::future::{Future, FutureObj, LocalFutureObj, UnsafeFutureObj};

Expand All @@ -20,7 +20,7 @@ pub trait TryFuture {
/// directly inheriting from the `Future` trait; in the future it won't be
/// needed.
fn try_poll(
self: PinMut<Self>,
self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Result<Self::Ok, Self::Error>>;
}
Expand All @@ -32,7 +32,7 @@ impl<F, T, E> TryFuture for F
type Error = E;

#[inline]
fn try_poll(self: PinMut<Self>, cx: &mut task::Context) -> Poll<F::Output> {
fn try_poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<F::Output> {
self.poll(cx)
}
}
43 changes: 21 additions & 22 deletions futures-core/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use crate::task::{self, Poll};
use core::marker::Unpin;
use core::pin::PinMut;
use core::pin::Pin;

#[cfg(feature = "either")]
use either::Either;
Expand Down Expand Up @@ -52,7 +52,7 @@ pub trait Stream {
/// to ensure that `poll_next` always returns `Ready(None)` in subsequent
/// calls.
fn poll_next(
self: PinMut<Self>,
self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<Self::Item>>;
}
Expand All @@ -61,21 +61,21 @@ impl<'a, S: ?Sized + Stream + Unpin> Stream for &'a mut S {
type Item = S::Item;

fn poll_next(
mut self: PinMut<Self>,
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<Self::Item>> {
S::poll_next(PinMut::new(&mut **self), cx)
S::poll_next(Pin::new(&mut **self), cx)
}
}

impl<'a, S: ?Sized + Stream> Stream for PinMut<'a, S> {
impl<'a, S: ?Sized + Stream> Stream for Pin<&'a mut S> {
type Item = S::Item;

fn poll_next(
mut self: PinMut<Self>,
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<Self::Item>> {
S::poll_next((*self).reborrow(), cx)
S::poll_next((*self).as_mut(), cx)
}
}

Expand All @@ -86,11 +86,11 @@ impl<A, B> Stream for Either<A, B>
{
type Item = A::Item;

fn poll_next(self: PinMut<Self>, cx: &mut task::Context) -> Poll<Option<A::Item>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<A::Item>> {
unsafe {
match PinMut::get_mut_unchecked(self) {
Either::Left(a) => PinMut::new_unchecked(a).poll_next(cx),
Either::Right(b) => PinMut::new_unchecked(b).poll_next(cx),
match Pin::get_mut_unchecked(self) {
Either::Left(a) => Pin::new_unchecked(a).poll_next(cx),
Either::Right(b) => Pin::new_unchecked(b).poll_next(cx),
}
}
}
Expand All @@ -110,7 +110,7 @@ pub trait TryStream {
/// This method is a stopgap for a compiler limitation that prevents us from
/// directly inheriting from the `Stream` trait; in the future it won't be
/// needed.
fn try_poll_next(self: PinMut<Self>, cx: &mut task::Context)
fn try_poll_next(self: Pin<&mut Self>, cx: &mut task::Context)
-> Poll<Option<Result<Self::Ok, Self::Error>>>;
}

Expand All @@ -120,7 +120,7 @@ impl<S, T, E> TryStream for S
type Ok = T;
type Error = E;

fn try_poll_next(self: PinMut<Self>, cx: &mut task::Context)
fn try_poll_next(self: Pin<&mut Self>, cx: &mut task::Context)
-> Poll<Option<Result<Self::Ok, Self::Error>>>
{
self.poll_next(cx)
Expand All @@ -129,46 +129,45 @@ impl<S, T, E> TryStream for S

if_std! {
use std::boxed::Box;
use std::pin::PinBox;

impl<S: ?Sized + Stream + Unpin> Stream for Box<S> {
type Item = S::Item;

fn poll_next(
mut self: PinMut<Self>,
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<Self::Item>> {
PinMut::new(&mut **self).poll_next(cx)
Pin::new(&mut **self).poll_next(cx)
}
}

impl<S: ?Sized + Stream> Stream for PinBox<S> {
impl<S: ?Sized + Stream> Stream for Pin<Box<S>> {
type Item = S::Item;

fn poll_next(
mut self: PinMut<Self>,
mut self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<Self::Item>> {
self.as_pin_mut().poll_next(cx)
S::poll_next((*self).as_mut(), cx)
}
}

impl<S: Stream> Stream for ::std::panic::AssertUnwindSafe<S> {
type Item = S::Item;

fn poll_next(
self: PinMut<Self>,
self: Pin<&mut Self>,
cx: &mut task::Context,
) -> Poll<Option<S::Item>> {
unsafe { PinMut::map_unchecked(self, |x| &mut x.0) }.poll_next(cx)
unsafe { Pin::map_unchecked_mut(self, |x| &mut x.0) }.poll_next(cx)
}
}

impl<T: Unpin> Stream for ::std::collections::VecDeque<T> {
type Item = T;

fn poll_next(
mut self: PinMut<Self>,
mut self: Pin<&mut Self>,
_cx: &mut task::Context,
) -> Poll<Option<Self::Item>> {
Poll::Ready(self.pop_front())
Expand Down
Loading