Skip to content

Commit

Permalink
perf: Avoid an Option in the Map* futures (#2306)
Browse files Browse the repository at this point in the history
* perf: Avoid an Option in the `Map*` futures

* refactor: Rewrite UnfoldState with project_replace
  • Loading branch information
Markus Westerlind authored Jan 11, 2021
1 parent 1661bad commit 1db2b4e
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 38 deletions.
2 changes: 1 addition & 1 deletion futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
tokio-io = { version = "0.1.9", optional = true }
pin-utils = "0.1.0"
pin-project-lite = "0.2"
pin-project-lite = "0.2.4"

[dev-dependencies]
futures = { path = "../futures", features = ["async-await", "thread-pool"] }
Expand Down
30 changes: 18 additions & 12 deletions futures-util/src/future/future/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ use crate::fns::FnOnce1;
pin_project! {
/// Internal Map future
#[project = MapProj]
#[project_replace = MapProjReplace]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub enum Map<Fut, F> {
Incomplete {
#[pin]
future: Fut,
f: Option<F>,
f: F,
},
Complete,
}
Expand All @@ -24,13 +25,14 @@ pin_project! {
impl<Fut, F> Map<Fut, F> {
/// Creates a new Map.
pub(crate) fn new(future: Fut, f: F) -> Self {
Self::Incomplete { future, f: Some(f) }
Self::Incomplete { future, f }
}
}

impl<Fut, F, T> FusedFuture for Map<Fut, F>
where Fut: Future,
F: FnOnce1<Fut::Output, Output=T>,
where
Fut: Future,
F: FnOnce1<Fut::Output, Output = T>,
{
fn is_terminated(&self) -> bool {
match self {
Expand All @@ -41,20 +43,24 @@ impl<Fut, F, T> FusedFuture for Map<Fut, F>
}

impl<Fut, F, T> Future for Map<Fut, F>
where Fut: Future,
F: FnOnce1<Fut::Output, Output=T>,
where
Fut: Future,
F: FnOnce1<Fut::Output, Output = T>,
{
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
match self.as_mut().project() {
MapProj::Incomplete { future, f } => {
MapProj::Incomplete { future, .. } => {
let output = ready!(future.poll(cx));
let f = f.take().unwrap();
self.set(Self::Complete);
Poll::Ready(f.call_once(output))
},
MapProj::Complete => panic!("Map must not be polled after it returned `Poll::Ready`"),
match self.project_replace(Map::Complete) {
MapProjReplace::Incomplete { f, .. } => Poll::Ready(f.call_once(output)),
MapProjReplace::Complete => unreachable!(),
}
}
MapProj::Complete => {
panic!("Map must not be polled after it returned `Poll::Ready`")
}
}
}
}
6 changes: 3 additions & 3 deletions futures-util/src/sink/unfold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pin_project! {
pub fn unfold<T, F, R>(init: T, function: F) -> Unfold<T, F, R> {
Unfold {
function,
state: UnfoldState::Value(init),
state: UnfoldState::Value { value: init },
}
}

Expand All @@ -59,7 +59,7 @@ where
Some(value) => (this.function)(value, item),
None => panic!("start_send called without poll_ready being called first"),
};
this.state.set(UnfoldState::Future(future));
this.state.set(UnfoldState::Future { future });
Ok(())
}

Expand All @@ -68,7 +68,7 @@ where
Poll::Ready(if let Some(future) = this.state.as_mut().project_future() {
match ready!(future.poll(cx)) {
Ok(state) => {
this.state.set(UnfoldState::Value(state));
this.state.set(UnfoldState::Value { value: state });
Ok(())
}
Err(err) => Err(err),
Expand Down
8 changes: 5 additions & 3 deletions futures-util/src/stream/unfold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ where
{
Unfold {
f,
state: UnfoldState::Value(init),
state: UnfoldState::Value { value: init },
}
}

Expand Down Expand Up @@ -104,7 +104,9 @@ where
let mut this = self.project();

if let Some(state) = this.state.as_mut().take_value() {
this.state.set(UnfoldState::Future((this.f)(state)));
this.state.set(UnfoldState::Future {
future: (this.f)(state),
});
}

let step = match this.state.as_mut().project_future() {
Expand All @@ -113,7 +115,7 @@ where
};

if let Some((item, next_state)) = step {
this.state.set(UnfoldState::Value(next_state));
this.state.set(UnfoldState::Value { value: next_state });
Poll::Ready(Some(item))
} else {
this.state.set(UnfoldState::Empty);
Expand Down
43 changes: 24 additions & 19 deletions futures-util/src/unfold_state.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,37 @@
use core::pin::Pin;

/// UnfoldState used for stream and sink unfolds
#[derive(Debug)]
pub(crate) enum UnfoldState<T, R> {
Value(T),
Future(/* #[pin] */ R),
Empty,
use pin_project_lite::pin_project;

pin_project! {
/// UnfoldState used for stream and sink unfolds
#[project = UnfoldStateProj]
#[project_replace = UnfoldStateProjReplace]
#[derive(Debug)]
pub(crate) enum UnfoldState<T, R> {
Value {
value: T,
},
Future {
#[pin]
future: R,
},
Empty,
}
}

impl<T, R> UnfoldState<T, R> {
pub(crate) fn project_future(self: Pin<&mut Self>) -> Option<Pin<&mut R>> {
// SAFETY Normal pin projection on the `Future` variant
unsafe {
match self.get_unchecked_mut() {
Self::Future(f) => Some(Pin::new_unchecked(f)),
_ => None,
}
match self.project() {
UnfoldStateProj::Future { future } => Some(future),
_ => None,
}
}

pub(crate) fn take_value(self: Pin<&mut Self>) -> Option<T> {
// SAFETY We only move out of the `Value` variant which is not pinned
match *self {
Self::Value(_) => unsafe {
match core::mem::replace(self.get_unchecked_mut(), UnfoldState::Empty) {
UnfoldState::Value(v) => Some(v),
_ => core::hint::unreachable_unchecked(),
}
match &*self {
UnfoldState::Value { .. } => match self.project_replace(UnfoldState::Empty) {
UnfoldStateProjReplace::Value { value } => Some(value),
_ => unreachable!(),
},
_ => None,
}
Expand Down

0 comments on commit 1db2b4e

Please sign in to comment.