Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move stream items into tokio-stream #3277

Merged
merged 9 commits into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ missing a difficulty rating, and you should feel free to add one.
- **M-process** The `tokio::process` module.
- **M-runtime** The `tokio::runtime` module.
- **M-signal** The `tokio::signal` module.
- **M-stream** The `tokio::stream` module.
- **M-sync** The `tokio::sync` module.
- **M-task** The `tokio::task` module.
- **M-time** The `tokio::time` module.
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"tokio",
"tokio-macros",
"tokio-test",
"tokio-stream",
"tokio-util",

# Internal
Expand Down
12 changes: 8 additions & 4 deletions benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,26 @@ fn create_100_000_medium(b: &mut Bencher) {
}

fn send_medium(b: &mut Bencher) {
let rt = rt();

b.iter(|| {
let (tx, mut rx) = mpsc::channel::<Medium>(1000);

let _ = tx.try_send([0; 64]);
let _ = rt.block_on(tx.send([0; 64]));

rx.try_recv().unwrap();
rt.block_on(rx.recv()).unwrap();
});
}

fn send_large(b: &mut Bencher) {
let rt = rt();

b.iter(|| {
let (tx, mut rx) = mpsc::channel::<Large>(1000);

let _ = tx.try_send([[0; 64]; 64]);
let _ = rt.block_on(tx.send([[0; 64]; 64]));

rx.try_recv().unwrap();
rt.block_on(rx.recv()).unwrap();
});
}

Expand Down
5 changes: 4 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ edition = "2018"
# [dependencies] instead.
[dev-dependencies]
tokio = { version = "1.0.0", path = "../tokio", features = ["full", "tracing"] }
tokio-util = { version = "0.6.0", path = "../tokio-util", features = ["full"] }
tokio-stream = { version = "0.1", path = "../tokio-stream" }

async-stream = "0.3"
tracing = "0.1"
tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] }
tokio-util = { version = "0.6.0", path = "../tokio-util", features = ["full"] }
bytes = "0.6"
futures = "0.3.0"
http = "0.2"
Expand Down
15 changes: 9 additions & 6 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
#![warn(rust_2018_idioms)]

use tokio::net::{TcpListener, TcpStream};
use tokio::stream::{Stream, StreamExt};
use tokio::sync::{mpsc, Mutex};
use tokio_stream::{Stream, StreamExt};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};

use futures::SinkExt;
Expand Down Expand Up @@ -101,9 +101,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
/// Shorthand for the transmit half of the message channel.
type Tx = mpsc::UnboundedSender<String>;

/// Shorthand for the receive half of the message channel.
type Rx = mpsc::UnboundedReceiver<String>;

/// Data that is shared between all peers in the chat server.
///
/// This is the set of `Tx` handles for all connected clients. Whenever a
Expand All @@ -127,7 +124,7 @@ struct Peer {
///
/// This is used to receive messages from peers. When a message is received
/// off of this `Rx`, it will be written to the socket.
rx: Rx,
rx: Pin<Box<dyn Stream<Item = String> + Send>>,
Comment on lines -130 to +127
Copy link
Contributor

@Darksonn Darksonn Dec 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This raises a question. The Rx type is not used in a way that expects a Stream, rather just in a way that expects something with a poll_next method. Currently the impl Stream is just removed, and the private poll_recv method is not made public.

What approach should we be taking here? I kinda feel like we should be exposing the poll_recv method, as the type supports it perfectly fine (there is only one receiver, so there are no intrusiveness concerns), but then we have both poll_next and poll_recv when we add back the Stream impl in ≥ half a year.

(edit for future readers: a poll_recv method has since been added)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this example need to use poll_* APIs at all?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we should rewrite it.

}

impl Shared {
Expand Down Expand Up @@ -159,11 +156,17 @@ impl Peer {
let addr = lines.get_ref().peer_addr()?;

// Create a channel for this peer
let (tx, rx) = mpsc::unbounded_channel();
let (tx, mut rx) = mpsc::unbounded_channel();

// Add an entry for this `Peer` in the shared state map.
state.lock().await.peers.insert(addr, tx);

let rx = Box::pin(async_stream::stream! {
while let Some(item) = rx.recv().await {
yield item;
}
});

Ok(Peer { lines, rx })
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/print_each_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
#![warn(rust_2018_idioms)]

use tokio::net::TcpListener;
use tokio::stream::StreamExt;
use tokio_stream::StreamExt;
use tokio_util::codec::{BytesCodec, Decoder};

use std::env;
Expand Down
2 changes: 1 addition & 1 deletion examples/tinydb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
#![warn(rust_2018_idioms)]

use tokio::net::TcpListener;
use tokio::stream::StreamExt;
use tokio_stream::StreamExt;
use tokio_util::codec::{Framed, LinesCodec};

use futures::SinkExt;
Expand Down
2 changes: 1 addition & 1 deletion examples/tinyhttp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use http::{header::HeaderValue, Request, Response, StatusCode};
extern crate serde_derive;
use std::{env, error::Error, fmt, io};
use tokio::net::{TcpListener, TcpStream};
use tokio::stream::StreamExt;
use tokio_stream::StreamExt;
use tokio_util::codec::{Decoder, Encoder, Framed};

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/udp-codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#![warn(rust_2018_idioms)]

use tokio::net::UdpSocket;
use tokio::stream::StreamExt;
use tokio::{io, time};
use tokio_stream::StreamExt;
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;

Expand Down
38 changes: 38 additions & 0 deletions tokio-stream/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[package]
name = "tokio-stream"
# When releasing to crates.io:
# - Remove path dependencies
# - Update html_root_url.
# - Update doc url
# - Cargo.toml
# - Update CHANGELOG.md.
# - Create "tokio-stream-0.1.x" git tag.
version = "0.1.0"
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
documentation = "https://docs.rs/tokio-stream/0.1.0/tokio_stream"
description = """
Utilities to work with `Stream` and `tokio`.
"""
categories = ["asynchronous"]
publish = false

[features]
default = ["time"]
time = ["tokio/time"]

[dependencies]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
tokio = { version = "1.0", path = "../tokio", features = ["sync"] }
async-stream = "0.3"

[dev-dependencies]
tokio = { version = "1.0", path = "../tokio", features = ["full"] }
tokio-test = { path = "../tokio-test" }
futures = { version = "0.3", default-features = false }

proptest = "0.10.0"
2 changes: 1 addition & 1 deletion tokio/src/stream/all.rs → tokio-stream/src/all.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::future::Future;
use core::marker::PhantomPinned;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/stream/any.rs → tokio-stream/src/any.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::future::Future;
use core::marker::PhantomPinned;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/stream/chain.rs → tokio-stream/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::{Fuse, Stream};
use crate::{Fuse, Stream};

use core::pin::Pin;
use core::task::{Context, Poll};
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/stream/collect.rs → tokio-stream/src/collect.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::future::Future;
use core::marker::PhantomPinned;
Expand Down Expand Up @@ -26,7 +26,7 @@ pin_project! {
}
}

/// Convert from a [`Stream`](crate::stream::Stream).
/// Convert from a [`Stream`](crate::Stream).
///
/// This trait is not intended to be used directly. Instead, call
/// [`StreamExt::collect()`](super::StreamExt::collect).
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/stream/empty.rs → tokio-stream/src/empty.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::marker::PhantomData;
use core::pin::Pin;
Expand All @@ -24,7 +24,7 @@ unsafe impl<T> Sync for Empty<T> {}
/// Basic usage:
///
/// ```
/// use tokio::stream::{self, StreamExt};
/// use tokio_stream::{self as stream, StreamExt};
///
/// #[tokio::main]
/// async fn main() {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/stream/filter.rs → tokio-stream/src/filter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::fmt;
use core::pin::Pin;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::fmt;
use core::pin::Pin;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/stream/fold.rs → tokio-stream/src/fold.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::future::Future;
use core::marker::PhantomPinned;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/stream/fuse.rs → tokio-stream/src/fuse.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use pin_project_lite::pin_project;
use std::pin::Pin;
Expand Down
21 changes: 16 additions & 5 deletions tokio/src/stream/iter.rs → tokio-stream/src/iter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::Stream;
use crate::Stream;

use core::pin::Pin;
use core::task::{Context, Poll};
Expand All @@ -8,6 +8,7 @@ use core::task::{Context, Poll};
#[must_use = "streams do nothing unless polled"]
pub struct Iter<I> {
iter: I,
yield_amt: usize,
}

impl<I> Unpin for Iter<I> {}
Expand All @@ -20,7 +21,7 @@ impl<I> Unpin for Iter<I> {}
///
/// ```
/// # async fn dox() {
/// use tokio::stream::{self, StreamExt};
/// use tokio_stream::{self as stream, StreamExt};
///
/// let mut stream = stream::iter(vec![17, 19]);
///
Expand All @@ -35,6 +36,7 @@ where
{
Iter {
iter: i.into_iter(),
yield_amt: 0,
}
}

Expand All @@ -45,9 +47,18 @@ where
type Item = I::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<I::Item>> {
let coop = ready!(crate::coop::poll_proceed(cx));
coop.made_progress();
Poll::Ready(self.iter.next())
// TODO: add coop back
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
if self.yield_amt >= 32 {
self.yield_amt = 0;

cx.waker().wake_by_ref();

Poll::Pending
} else {
self.yield_amt += 1;

Poll::Ready(self.iter.next())
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand Down
Loading