Skip to content

Handling error while processing stream. #94

Open
@alishir

Description

@alishir

I have a simple stream of TcpStream::connect futures, I want to process elements in stream concurrently so I used buffer_unordered, but I coundn't figure out how should I create future from new stream to run with tokio::run. Also some connection may be refused because of network condition, so I coulnd't use for_each over stream.

Here is the code so far:

extern crate bytes;
extern crate tokio;

use bytes::BytesMut;
use tokio::io;
use tokio::net::TcpStream;
use tokio::prelude::*;

fn main() {
    let srv_list = vec![
        "127.0.0.1:12345",
        "127.0.0.1:12346",
        "127.0.0.1:12347",
        "127.0.0.1:12348",
        "127.0.0.1:12349",
    ];
    let conn_fut: Vec<_> = srv_list
        .iter()
        .map(|addr| {
            let addr = addr.parse().unwrap();

            TcpStream::connect(&addr)
                .and_then(move |stream| {
                    println!("connected to address: {:?}", addr);
                })
                .map_err(|err| {
                    println!("couldn't connect, {:?}", err);
                })
        })
        .collect();

    let task = stream::iter_ok::<_, ()>(conn_fut);
    let task = task.buffer_unordered(5);
    /*
     what should I do here?!
   */
    tokio::run(task);
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions