Skip to content

Commit

Permalink
tokio: rewrite examples with async. (tokio-rs#1228)
Browse files Browse the repository at this point in the history
  • Loading branch information
rubdos authored and carllerche committed Jul 9, 2019
1 parent f529928 commit 8279518
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 103 deletions.
32 changes: 15 additions & 17 deletions tokio/examples_old/echo-udp.rs → tokio/examples/echo-udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,44 +10,47 @@
//!
//! Each line you type in to the `nc` terminal should be echo'd back to you!
#![feature(async_await)]
#![deny(warnings, rust_2018_idioms)]

use futures::try_ready;
use std::net::SocketAddr;
use std::{env, io};
use tokio;
use tokio::net::UdpSocket;
use tokio::prelude::*;

struct Server {
socket: UdpSocket,
buf: Vec<u8>,
to_send: Option<(usize, SocketAddr)>,
}

impl Future for Server {
type Item = ();
type Error = io::Error;
impl Server {
async fn run(self) -> Result<(), io::Error> {
let Server {
mut socket,
mut buf,
mut to_send,
} = self;

fn poll(&mut self) -> Poll<(), io::Error> {
loop {
// First we check to see if there's a message we need to echo back.
// If so then we try to send it back to the original source, waiting
// until it's writable and we're able to do so.
if let Some((size, peer)) = self.to_send {
let amt = try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer));
if let Some((size, peer)) = to_send {
let amt = socket.send_to(&buf[..size], &peer).await?;

println!("Echoed {}/{} bytes to {}", amt, size, peer);
self.to_send = None;
}

// If we're here then `to_send` is `None`, so we take a look for the
// next message we're going to echo back.
self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf)));
to_send = Some(socket.recv_from(&mut buf).await?);
}
}
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>()?;

Expand All @@ -61,11 +64,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
};

// This starts the server task.
//
// `map_err` handles the error by logging it and maps the future to a type
// that can be spawned.
//
// `tokio::run` spawns the task on the Tokio runtime and starts running.
tokio::run(server.map_err(|e| println!("server error = {:?}", e)));
server.run().await?;
Ok(())
}
34 changes: 34 additions & 0 deletions tokio/examples/hello_world.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! Hello world server.
//!
//! A simple client that opens a TCP stream, writes "hello world\n", and closes
//! the connection.
//!
//! You can test this out by running:
//!
//! ncat -l 6142
//!
//! And then in another terminal run:
//!
//! cargo run --example hello_world
#![deny(warnings, rust_2018_idioms)]
#![feature(async_await)]

use tokio;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "127.0.0.1:6142".parse()?;

// Open a TCP stream to the socket address.
//
// Note that this is the Tokio TcpStream, which is fully async.
let mut stream = TcpStream::connect(&addr).await?;
println!("created stream");
let result = stream.write(b"hello world\n").await;
println!("wrote to stream; success={:?}", result.is_ok());

Ok(())
}
30 changes: 15 additions & 15 deletions tokio/examples_old/udp-client.rs → tokio/examples/udp-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@
//! Please mind that since the UDP protocol doesn't have any capabilities to detect a broken
//! connection the server needs to be run first, otherwise the client will block forever.
#![feature(async_await)]
#![deny(warnings, rust_2018_idioms)]

use std::env;
use std::io::stdin;
use std::io::{stdin, Read};
use std::net::SocketAddr;
use tokio::net::UdpSocket;
use tokio::prelude::*;

fn get_stdin_data() -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let mut buf = Vec::new();
stdin().read_to_end(&mut buf)?;
Ok(buf)
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let remote_addr: SocketAddr = env::args()
.nth(1)
.unwrap_or("127.0.0.1:8080".into())
Expand All @@ -52,18 +53,17 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"[::]:0"
}
.parse()?;
let socket = UdpSocket::bind(&local_addr)?;
let mut socket = UdpSocket::bind(&local_addr)?;
const MAX_DATAGRAM_SIZE: usize = 65_507;
socket
.send_dgram(get_stdin_data()?, &remote_addr)
.and_then(|(socket, _)| socket.recv_dgram(vec![0u8; MAX_DATAGRAM_SIZE]))
.map(|(_, data, len, _)| {
println!(
"Received {} bytes:\n{}",
len,
String::from_utf8_lossy(&data[..len])
)
})
.wait()?;
socket.connect(&remote_addr)?;
let data = get_stdin_data()?;
socket.send(&data).await?;
let mut data = vec![0u8; MAX_DATAGRAM_SIZE];
let len = socket.recv(&mut data).await?;
println!(
"Received {} bytes:\n{}",
len,
String::from_utf8_lossy(&data[..len])
);
Ok(())
}
31 changes: 17 additions & 14 deletions tokio/examples_old/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
//! Note how non-blocking threads are executed before blocking threads finish
//! their task.
#![feature(async_await)]
#![deny(warnings, rust_2018_idioms)]

use std::pin::Pin;
use std::thread;
use std::time::Duration;
use tokio;
Expand All @@ -24,18 +26,19 @@ struct BlockingFuture {
}

impl Future for BlockingFuture {
type Item = ();
type Error = ();
type Output = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(self: Pin<&mut Self>, _ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
println!("Blocking begin: {}!", self.value);
// Try replacing this part with commnted code
blocking(|| {
println!("Blocking part annotated: {}!", self.value);
thread::sleep(Duration::from_millis(1000));
println!("Blocking done annotated: {}!", self.value);
}).map(|result| match result {
Ok(result) => result,
Err(err) => panic!("Error in blocing block: {:?}", err),
})
.map_err(|err| panic!("Error in blocing block: {:?}", err))
// println!("Blocking part annotated: {}!", self.value);
// thread::sleep(Duration::from_millis(1000));
// println!("Blocking done annotated: {}!", self.value);
Expand All @@ -49,23 +52,21 @@ struct NonBlockingFuture {
}

impl Future for NonBlockingFuture {
type Item = ();
type Error = ();
type Output = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(self: Pin<&mut Self>, _ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
println!("Non-blocking done: {}!", self.value);
Ok(Async::Ready(()))
Poll::Ready(())
}
}

/// This future spawns child futures.
struct SpawningFuture;

impl Future for SpawningFuture {
type Item = ();
type Error = ();
type Output = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(self: Pin<&mut Self>, _ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
for i in 0..8 {
let blocking_future = BlockingFuture { value: i };

Expand All @@ -75,13 +76,15 @@ impl Future for SpawningFuture {
let non_blocking_future = NonBlockingFuture { value: i };
tokio::spawn(non_blocking_future);
}
Ok(Async::Ready(()))
Poll::Ready(())
}
}

fn main() {
let spawning_future = SpawningFuture;

let runtime = Builder::new().core_threads(4).build().unwrap();
runtime.block_on_all(spawning_future).unwrap();
let mut runtime = Builder::new()
.core_threads(4)
.build().unwrap();
runtime.block_on_all(spawning_future);
}
57 changes: 0 additions & 57 deletions tokio/examples_old/hello_world.rs

This file was deleted.

0 comments on commit 8279518

Please sign in to comment.