Skip to content

Latest commit

 

History

History
76 lines (56 loc) · 3.26 KB

9-2-Handling Connections Concurrently.md

File metadata and controls

76 lines (56 loc) · 3.26 KB

现在我们遇到的问题是listener.incoming()是一个阻塞的迭代器。当listener等待传入的请求时执行器不能运行任何future,在我们处理完前一个链接时我们不能处理一个新的链接。

为了修复这个问题,我们把从listener.incoming中获得的阻塞的迭代器换成一个非阻塞的StreamStream类似于迭代器,但是可以在异步中使用。更多信息查看前面关于Stream的章节。

我们把阻塞的std::net::TcpListener替换为非阻塞的async_std::net::TcpListener,修改我们的链接处理函数接收一个async_std::net::TcpStream

use async_std::prelude::*;

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    //<-- snip -->
    stream.write(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

异步版本的TcpListenerlistener.incoming()实现了Stream trait,这个变化带来了2个好处。第一个是listnener.incoming()不再阻塞当前执行器。当没有传入的Tcp链接被处理时,执行器现在可以让给其他待定的futures

第二个好处是从Stream中获取的元素可以使用Streamfor_each_concurrent方法,选择是否并发处理它。这里,我们将利用这个方法并发地处理每个传入的请求。我们需要从futures crate中引入Stream trait,因此Cargo.toml文件看起来像这样:

+[dependencies]
+futures = "0.3"

 [dependencies.async-std]
 version = "1.6"
 features = ["attributes"]

现在,我们通过传递handle_connection到一个闭包中,能够并发地处理每个链接。这个闭包函数获取了没个TcpStream的所有权,新的TcpStream可用时就运行它。同时handle_connection不再阻塞,一个慢请求也就不会阻止其他请求完成了。

use async_std::net::TcpListener;
use async_std::net::TcpStream;
use futures::stream::StreamExt;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    listener
        .incoming()
        .for_each_concurrent(/* limit */ None, |tcpstream| async move {
            let tcpstream = tcpstream.unwrap();
            handle_connection(tcpstream).await;
        })
        .await;
}

并行处理请求

到目前为止,我们的示例主要介绍了并发(使用异步代码)作为并行性(使用线程)的替代方法。然而,异步代码和线程不是相互排斥的。在我们的示例中,for_each_concurrent并发地处理每个链接,但是在同一个线程上实现的。async-std crate也允许我们生成任务到单独的线程上。因为handle_connection实现了Send,同时也是非阻塞的,它能安全地使用async_std::task::spawn。如下所示:

use async_std::task::spawn;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    listener
        .incoming()
        .for_each_concurrent(/* limit */ None, |stream| async move {
            let stream = stream.unwrap();
            spawn(handle_connection(stream));
        })
        .await;
}

现在我们就能同时使用并发和并行处理多个请求了。