From 142a9e9e8806cd3425568179d0afbe0c23ea2487 Mon Sep 17 00:00:00 2001 From: Huo Linhe Date: Tue, 8 Aug 2023 22:15:32 +0800 Subject: [PATCH] fix(ws): fix some panics in websocket impls Close [TS-3796](https://jira.taosdata.com:18080/browse/TS-3796) --- taos-ws/src/consumer/mod.rs | 2 +- taos-ws/src/query/asyn.rs | 39 +++++++++++++++++++++++++++---------- taos-ws/src/stmt/mod.rs | 5 ++++- 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/taos-ws/src/consumer/mod.rs b/taos-ws/src/consumer/mod.rs index 18727356..b3c1a3fb 100644 --- a/taos-ws/src/consumer/mod.rs +++ b/taos-ws/src/consumer/mod.rs @@ -696,7 +696,7 @@ impl TmqBuilder { let mut close_listener = rx.clone(); let sending_url = url.clone(); - static PING_INTERVAL: u64 = 30; + static PING_INTERVAL: u64 = 29; const PING: &[u8] = b"TAOSX"; tokio::spawn(async move { diff --git a/taos-ws/src/query/asyn.rs b/taos-ws/src/query/asyn.rs index 64d19b30..4986a6cb 100644 --- a/taos-ws/src/query/asyn.rs +++ b/taos-ws/src/query/asyn.rs @@ -12,6 +12,7 @@ use taos_query::{ block_in_place_or_global, AsyncFetchable, AsyncQueryable, DeError, DsnError, IntoDsn, }; use thiserror::Error; +use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use taos_query::prelude::tokio; use tokio::net::TcpStream; @@ -116,7 +117,9 @@ impl WsQuerySender { } // handle the error log::trace!("[req id: {req_id}] message sent, wait for receiving"); - Ok(rx.await.unwrap().map_err(Error::from)?) + let res = rx.await.unwrap().map_err(Error::from)?; + log::trace!("[req id: {req_id}] message received: {res:?}"); + Ok(res) } async fn send_only(&self, msg: WsSend) -> RawResult<()> { let send_timeout = Duration::from_millis(1000); @@ -299,6 +302,17 @@ async fn read_queries( is_v3: bool, mut close_listener: watch::Receiver, ) { + let ws3 = ws2.clone(); + tokio::spawn(async move { + let mut interval = time::interval(Duration::from_secs(29)); + loop { + interval.tick().await; + if let Err(err) = ws3.send(Message::Ping(b"TAOSX".to_vec())).await { + log::trace!("sending ping message error: {err:?}"); + break; + } + } + }); 'ws: loop { tokio::select! { Some(message) = reader.next() => { @@ -312,7 +326,9 @@ async fn read_queries( WsRecvData::Query(_) => { if let Some((_, sender)) = queries_sender.remove(&req_id) { - sender.send(ok.map(|_| data)).unwrap(); + if let Err(err) = sender.send(ok.map(|_| data)) { + log::error!("send data with error: {err:?}"); + } } else { debug_assert!(!queries_sender.contains_key(&req_id)); log::warn!("req_id {req_id} not detected, message might be lost"); @@ -413,6 +429,8 @@ async fn read_queries( } } Message::Close(close) => { + // taosAdapter should never send close frame to client. + // So all close frames should be treated as error. if let Some(close) = close { log::warn!("websocket received close frame: {close:?}"); @@ -420,12 +438,15 @@ async fn read_queries( for e in queries_sender.iter() { keys.push(*e.key()); } - // queries_sender.for_each_async(|k, _| { - // keys.push(*k); - // }).await; + let reason = match close.code { + CloseCode::Size => { + format!("Message length reaches max limit (code: {})", close.code) + } + _ => format!("{}", close), + }; for k in keys { if let Some((_, sender)) = queries_sender.remove(&k) { - let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(), close.reason.to_string()))); + let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(), reason.to_string()))); } } } else { @@ -434,9 +455,6 @@ async fn read_queries( for e in queries_sender.iter() { keys.push(*e.key()); } - // queries_sender.for_each_async(|k, _| { - // keys.push(*k); - // }).await; for k in keys { if let Some((_, sender)) = queries_sender.remove(&k) { let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(), "received close message"))); @@ -450,7 +468,7 @@ async fn read_queries( } Message::Pong(_) => { // do nothing - log::warn!("received (unexpected) pong message, do nothing"); + log::trace!("received pong message, do nothing"); } Message::Frame(frame) => { // do nothing @@ -611,6 +629,7 @@ impl WsTaos { Some(msg) = msg_recv.recv() => { // dbg!(&msg); if let Err(err) = sender.send(msg).await { + log::error!("Write websocket error: {}", err); let mut keys = Vec::new(); queries3.iter().for_each(|r| keys.push(*r.key())); // queries3.for_each_async(|k, _| { diff --git a/taos-ws/src/stmt/mod.rs b/taos-ws/src/stmt/mod.rs index a911f0dd..d3e880cc 100644 --- a/taos-ws/src/stmt/mod.rs +++ b/taos-ws/src/stmt/mod.rs @@ -291,7 +291,10 @@ impl Stmt { loop { tokio::select! { Some(msg) = msg_recv.recv() => { - sender.send(msg).await.unwrap(); + if let Err(err) = sender.send(msg).await { + // + break; + } } _ = rx.changed() => { log::trace!("close sender task");