Skip to content

Commit

Permalink
fix(ws): fix some panics in websocket impls
Browse files Browse the repository at this point in the history
  • Loading branch information
zitsen committed Aug 8, 2023
1 parent d0fe19c commit 142a9e9
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 12 deletions.
2 changes: 1 addition & 1 deletion taos-ws/src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ impl TmqBuilder {
let mut close_listener = rx.clone();

let sending_url = url.clone();
static PING_INTERVAL: u64 = 30;
static PING_INTERVAL: u64 = 29;
const PING: &[u8] = b"TAOSX";

tokio::spawn(async move {
Expand Down
39 changes: 29 additions & 10 deletions taos-ws/src/query/asyn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use taos_query::{
block_in_place_or_global, AsyncFetchable, AsyncQueryable, DeError, DsnError, IntoDsn,
};
use thiserror::Error;
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;

use taos_query::prelude::tokio;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -116,7 +117,9 @@ impl WsQuerySender {
}
// handle the error
log::trace!("[req id: {req_id}] message sent, wait for receiving");
Ok(rx.await.unwrap().map_err(Error::from)?)
let res = rx.await.unwrap().map_err(Error::from)?;
log::trace!("[req id: {req_id}] message received: {res:?}");
Ok(res)
}
async fn send_only(&self, msg: WsSend) -> RawResult<()> {
let send_timeout = Duration::from_millis(1000);
Expand Down Expand Up @@ -299,6 +302,17 @@ async fn read_queries(
is_v3: bool,
mut close_listener: watch::Receiver<bool>,
) {
let ws3 = ws2.clone();
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(29));
loop {
interval.tick().await;
if let Err(err) = ws3.send(Message::Ping(b"TAOSX".to_vec())).await {
log::trace!("sending ping message error: {err:?}");
break;
}
}
});
'ws: loop {
tokio::select! {
Some(message) = reader.next() => {
Expand All @@ -312,7 +326,9 @@ async fn read_queries(
WsRecvData::Query(_) => {
if let Some((_, sender)) = queries_sender.remove(&req_id)
{
sender.send(ok.map(|_| data)).unwrap();
if let Err(err) = sender.send(ok.map(|_| data)) {
log::error!("send data with error: {err:?}");
}
} else {
debug_assert!(!queries_sender.contains_key(&req_id));
log::warn!("req_id {req_id} not detected, message might be lost");
Expand Down Expand Up @@ -413,19 +429,24 @@ async fn read_queries(
}
}
Message::Close(close) => {
// taosAdapter should never send close frame to client.
// So all close frames should be treated as error.
if let Some(close) = close {
log::warn!("websocket received close frame: {close:?}");

let mut keys = Vec::new();
for e in queries_sender.iter() {
keys.push(*e.key());
}
// queries_sender.for_each_async(|k, _| {
// keys.push(*k);
// }).await;
let reason = match close.code {
CloseCode::Size => {
format!("Message length reaches max limit (code: {})", close.code)
}
_ => format!("{}", close),
};
for k in keys {
if let Some((_, sender)) = queries_sender.remove(&k) {
let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(), close.reason.to_string())));
let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(), reason.to_string())));
}
}
} else {
Expand All @@ -434,9 +455,6 @@ async fn read_queries(
for e in queries_sender.iter() {
keys.push(*e.key());
}
// queries_sender.for_each_async(|k, _| {
// keys.push(*k);
// }).await;
for k in keys {
if let Some((_, sender)) = queries_sender.remove(&k) {
let _ = sender.send(Err(RawError::new(WS_ERROR_NO::CONN_CLOSED.as_code(), "received close message")));
Expand All @@ -450,7 +468,7 @@ async fn read_queries(
}
Message::Pong(_) => {
// do nothing
log::warn!("received (unexpected) pong message, do nothing");
log::trace!("received pong message, do nothing");
}
Message::Frame(frame) => {
// do nothing
Expand Down Expand Up @@ -611,6 +629,7 @@ impl WsTaos {
Some(msg) = msg_recv.recv() => {
// dbg!(&msg);
if let Err(err) = sender.send(msg).await {
log::error!("Write websocket error: {}", err);
let mut keys = Vec::new();
queries3.iter().for_each(|r| keys.push(*r.key()));
// queries3.for_each_async(|k, _| {
Expand Down
5 changes: 4 additions & 1 deletion taos-ws/src/stmt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,10 @@ impl Stmt {
loop {
tokio::select! {
Some(msg) = msg_recv.recv() => {
sender.send(msg).await.unwrap();
if let Err(err) = sender.send(msg).await {
//
break;
}
}
_ = rx.changed() => {
log::trace!("close sender task");
Expand Down

0 comments on commit 142a9e9

Please sign in to comment.