Skip to content

Commit

Permalink
tcp client: fix subscribe deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
beni69 committed Aug 1, 2023
1 parent bfd9ee8 commit 079d277
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 19 deletions.
18 changes: 11 additions & 7 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,33 +33,37 @@ udp = []
http = ["async", "dep:reqwest"]
ws = ["async", "dep:tokio-tungstenite", "dep:futures"]

[[example]]
name = "latency"
required-features = ["default"]

[[example]]
name = "simple"
required-features = ["roland"]
required-features = ["default", "roland"]

[[example]]
name = "blink"
required-features = ["gpio"]
required-features = ["default", "gpio"]

[[example]]
name = "gpio_in"
required-features = ["gpio"]
required-features = ["default", "gpio"]

[[example]]
name = "controller"
required-features = ["roland", "gpio", "camloc"]
required-features = ["default", "roland", "gpio", "camloc"]

[[example]]
name = "sensor"
required-features = ["roland"]
required-features = ["default", "roland"]

[[example]]
name = "circle"
required-features = ["roland"]
required-features = ["default", "roland"]

[[example]]
name = "tcp"
required-features = ["roland"]
required-features = ["tcp", "roland"]

[[example]]
name = "async"
Expand Down
20 changes: 12 additions & 8 deletions client/src/transports/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type D<'a> = bincode::Deserializer<
type Handler = Box<dyn Send + Sync + (for<'a> FnMut(D<'a>) -> Result<()>)>;

struct TcpInner {
handlers: std::sync::Mutex<HashMap<u32, Handler>>,
handlers: std::sync::Mutex<HashMap<u32, (Handler, bool)>>,
events: std::sync::Mutex<HashMap<roblib::event::ConcreteType, u32>>,
running: std::sync::RwLock<bool>,
}
Expand Down Expand Up @@ -75,12 +75,15 @@ impl Tcp {
let mut c = Cursor::new(&buf[Self::HEADER..end]);
let id: u32 = bincode::Options::deserialize_from(bin, &mut c)?;

let mut handlers = inner.handlers.lock().unwrap();
let Some(handler) = handlers.get_mut(&id) else {
let Some(mut handler) = inner.handlers.lock().unwrap().remove(&id) else {
return Err(anyhow::Error::msg("received response for unknown id"));
};

handler(bincode::Deserializer::with_reader(&mut c, bin))?;
handler.0(bincode::Deserializer::with_reader(&mut c, bin))?;

if handler.1 {
inner.handlers.lock().unwrap().insert(id, handler);
}
}
}

Expand All @@ -101,8 +104,7 @@ impl Tcp {
tx.send(r).unwrap();
Ok::<(), anyhow::Error>(())
});

self.inner.handlers.lock().unwrap().insert(id, a);
self.inner.handlers.lock().unwrap().insert(id, (a, false));

rx.recv()?
} else {
Expand All @@ -122,7 +124,6 @@ impl Transport for Tcp {
drop(id_handle);

let res = self.cmd_id(cmd, id);
self.inner.handlers.lock().unwrap().remove(&id);
res
}
}
Expand All @@ -142,7 +143,10 @@ impl Subscribable for Tcp {

self.inner.handlers.lock().unwrap().insert(
id,
Box::new(move |mut des| handler(E::Item::deserialize(&mut des)?)),
(
Box::new(move |mut des| handler(E::Item::deserialize(&mut des)?)),
true,
),
);
self.inner.events.lock().unwrap().insert(ev, id);

Expand Down
7 changes: 3 additions & 4 deletions client/src/transports/ws.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::{collections::HashMap, io::Cursor, sync::Arc};

use super::{SubscribableAsync, TransportAsync};
use anyhow::Result;
use async_trait::async_trait;
use futures::{SinkExt, TryStreamExt};
use futures::{executor::block_on, SinkExt, TryStreamExt};
use roblib::{
cmd::{self, has_return},
event::{ConcreteType, Event},
text_format,
};
use serde::Deserialize;
use std::{collections::HashMap, io::Cursor, sync::Arc};
use tokio::{
net::TcpStream,
sync::{
Expand Down Expand Up @@ -183,6 +182,6 @@ impl SubscribableAsync for Ws {
impl Drop for Ws {
fn drop(&mut self) {
let _ = self.sender.send(Message::Close(None));
let _ = futures::executor::block_on(self.handle.take().unwrap());
let _ = block_on(self.handle.take().unwrap());
}
}

0 comments on commit 079d277

Please sign in to comment.