现在我们遇到的问题是listener.incoming()
是一个阻塞的迭代器。当listener
等待传入的请求时执行器不能运行任何future
,在我们处理完前一个链接时我们不能处理一个新的链接。
为了修复这个问题,我们把从listener.incoming
中获得的阻塞的迭代器换成一个非阻塞的Stream
。Stream
类似于迭代器,但是可以在异步中使用。更多信息查看前面关于Stream
的章节。
我们把阻塞的std::net::TcpListener
替换为非阻塞的async_std::net::TcpListener
,修改我们的链接处理函数接收一个async_std::net::TcpStream
。
use async_std::prelude::*;
async fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
//<-- snip -->
stream.write(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
异步版本的TcpListener
为listener.incoming()
实现了Stream
trait,这个变化带来了2个好处。第一个是listnener.incoming()
不再阻塞当前执行器。当没有传入的Tcp链接被处理时,执行器现在可以让给其他待定的futures
。
第二个好处是从Stream
中获取的元素可以使用Stream
的for_each_concurrent
方法,选择是否并发处理它。这里,我们将利用这个方法并发地处理每个传入的请求。我们需要从futures
crate中引入Stream
trait,因此Cargo.toml
文件看起来像这样:
+[dependencies]
+futures = "0.3"
[dependencies.async-std]
version = "1.6"
features = ["attributes"]
现在,我们通过传递handle_connection
到一个闭包中,能够并发地处理每个链接。这个闭包函数获取了没个TcpStream
的所有权,新的TcpStream
可用时就运行它。同时handle_connection
不再阻塞,一个慢请求也就不会阻止其他请求完成了。
use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener
.incoming()
.for_each_concurrent(/* limit */ None, |tcpstream| async move {
let tcpstream = tcpstream.unwrap();
handle_connection(tcpstream).await;
})
.await;
}
到目前为止,我们的示例主要介绍了并发(使用异步代码)作为并行性(使用线程)的替代方法。然而,异步代码和线程不是相互排斥的。在我们的示例中,for_each_concurrent
并发地处理每个链接,但是在同一个线程上实现的。async-std
crate也允许我们生成任务到单独的线程上。因为handle_connection
实现了Send
,同时也是非阻塞的,它能安全地使用async_std::task::spawn
。如下所示:
use async_std::task::spawn;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener
.incoming()
.for_each_concurrent(/* limit */ None, |stream| async move {
let stream = stream.unwrap();
spawn(handle_connection(stream));
})
.await;
}
现在我们就能同时使用并发和并行处理多个请求了。