Open
Description
Bug Report
Version
v0.12.3
Platform
x86_64-unknown-linux
Crates
Description
I use tonic as a grpc client with Channel::balance_channel(10)
, when client call server(multi node) at the first time, it works, and then wait a moment (1 min) the tcp connection status is turned to CLOSE_WAIT
(i.e. the server close the connection as thers is no other streams), and from now on, the later call to server failed to response like this:
2025-04-24 17:55:57.317958 ERROR grpc{trace_id="2125102617454885569858015d101d"}: src/grpc/handler.rs:47: err=Status { code: Cancelled, message: "operation was canceled", source: Some(tonic::transport::Error(Transport, hyper::Error(Canceled, "connection closed"))
) }
2025-04-24 17:55:57.318031 ERROR grpc{trace_id="2125102617454885569858015d101d"}: src/grpc/handler.rs:63: Status {
code: Cancelled,
message: "operation was canceled",
source: Some(
tonic::transport::Error(
Transport,
hyper::Error(
Canceled,
"connection closed",
),
),
),
}
I don't know if there is something wrong with my usage. Please help confirm. The following is the creation of the channel, thank you.
fn create_channel<K>(vipkey: K) -> ChannelType
where
K: AsRef<str> + Send + 'static,
{
let (channel, tx) = Channel::balance_channel(10);
let channel = ServiceBuilder::new()
.timeout(Duration::from_secs(3))
.service(channel);
tokio::spawn(async move {
let mut prev_hosts: Vec<Arc<Host>> = vec![];
let vipkey = vipkey.as_ref();
loop {
let hosts = vs::srv_hosts(vipkey).await.unwrap_or_default();
let hosts: Vec<_> = hosts.into_iter().filter(|h| h.valid).collect();
for host in hosts.iter() {
if prev_hosts
.iter()
.any(|h| h.ip == host.ip && h.port == host.port)
{
continue;
}
let key = format!("http://{}:{}", host.ip.as_str(), host.port);
let ep = Endpoint::from_shared(key.clone()).unwrap();
let change = Change::Insert(key, ep);
if let Err(err) = tx.send(change).await {
error!("send change error: {:?}", err);
}
}
for host in prev_hosts.iter() {
if hosts.iter().any(|h| h.ip == host.ip && h.port == host.port) {
continue;
}
let key = format!("http://{}:{}", host.ip.as_str(), host.port);
let change = Change::Remove(key);
if let Err(err) = tx.send(change).await {
error!("send change error: {:?}", err);
}
}
prev_hosts = hosts;
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
}
});
channel
}
Metadata
Metadata
Assignees
Labels
No labels