Skip to content

dynamic-load-balance channel can not work #2257

Open
@c98

Description

@c98

Bug Report

Version

v0.12.3

Platform

x86_64-unknown-linux

Crates

Description

I use tonic as a grpc client with Channel::balance_channel(10), when client call server(multi node) at the first time, it works, and then wait a moment (1 min) the tcp connection status is turned to CLOSE_WAIT (i.e. the server close the connection as thers is no other streams), and from now on, the later call to server failed to response like this:

2025-04-24 17:55:57.317958 ERROR grpc{trace_id="2125102617454885569858015d101d"}: src/grpc/handler.rs:47: err=Status { code: Cancelled, message: "operation was canceled", source: Some(tonic::transport::Error(Transport, hyper::Error(Canceled, "connection closed"))
) }
2025-04-24 17:55:57.318031 ERROR grpc{trace_id="2125102617454885569858015d101d"}: src/grpc/handler.rs:63: Status {
    code: Cancelled,
    message: "operation was canceled",
    source: Some(
        tonic::transport::Error(
            Transport,
            hyper::Error(
                Canceled,
                "connection closed",
            ),
        ),
    ),
}

TCP Status
Image

I don't know if there is something wrong with my usage. Please help confirm. The following is the creation of the channel, thank you.

fn create_channel<K>(vipkey: K) -> ChannelType
where
    K: AsRef<str> + Send + 'static,
{
    let (channel, tx) = Channel::balance_channel(10);
    let channel = ServiceBuilder::new()
        .timeout(Duration::from_secs(3))
        .service(channel);

    tokio::spawn(async move {
        let mut prev_hosts: Vec<Arc<Host>> = vec![];
        let vipkey = vipkey.as_ref();
        loop {
            let hosts = vs::srv_hosts(vipkey).await.unwrap_or_default();
            let hosts: Vec<_> = hosts.into_iter().filter(|h| h.valid).collect();
            for host in hosts.iter() {
                if prev_hosts
                    .iter()
                    .any(|h| h.ip == host.ip && h.port == host.port)
                {
                    continue;
                }
                let key = format!("http://{}:{}", host.ip.as_str(), host.port);
                let ep = Endpoint::from_shared(key.clone()).unwrap();
                let change = Change::Insert(key, ep);
                if let Err(err) = tx.send(change).await {
                    error!("send change error: {:?}", err);
                }
            }

            for host in prev_hosts.iter() {
                if hosts.iter().any(|h| h.ip == host.ip && h.port == host.port) {
                    continue;
                }
                let key = format!("http://{}:{}", host.ip.as_str(), host.port);
                let change = Change::Remove(key);
                if let Err(err) = tx.send(change).await {
                    error!("send change error: {:?}", err);
                }
            }
            prev_hosts = hosts;
            tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
        }
    });

    channel
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions